diff options
143 files changed, 1540 insertions, 791 deletions
diff --git a/java/broker/etc/config.xml b/java/broker/etc/config.xml index 61e0e55138..779a434332 100644 --- a/java/broker/etc/config.xml +++ b/java/broker/etc/config.xml @@ -79,6 +79,37 @@ </mechanisms> </sasl> </security> + <virtualhosts> + <virtualhost> + <name>localhost</name> + <localhost> + <store> + <!-- <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class> --> + <class>org.apache.qpid.server.store.MemoryMessageStore</class> + <environment-path>localhost-store</environment-path> + </store> + </localhost> + </virtualhost> + + <virtualhost> + <name>development</name> + <development> + <store> + <class>org.apache.qpid.server.store.MemoryMessageStore</class> + </store> + </development> + </virtualhost> + + <virtualhost> + <name>test</name> + <test> + <store> + <class>org.apache.qpid.server.store.MemoryMessageStore</class> + </store> + </test> + </virtualhost> + + </virtualhosts> <heartbeat> <delay>0</delay> <timeoutFactor>2.0</timeoutFactor> @@ -86,8 +117,6 @@ <queue> <auto_register>true</auto_register> </queue> - <store> - <class>org.apache.qpid.server.store.MemoryMessageStore</class> - </store> + <virtualhosts>${conf}/virtualhosts.xml</virtualhosts> </broker> diff --git a/java/broker/etc/virtualhosts.xml b/java/broker/etc/virtualhosts.xml index de6a8c0682..50cddb5661 100644 --- a/java/broker/etc/virtualhosts.xml +++ b/java/broker/etc/virtualhosts.xml @@ -21,7 +21,17 @@ --> <virtualhosts> <virtualhost> - <path>/development</path> + <path>localhost</path> + <bind>direct://amq.direct//queue</bind> + <bind>direct://amq.direct//ping</bind> + </virtualhost> + <virtualhost> + <path>development</path> + <bind>direct://amq.direct//queue</bind> + <bind>direct://amq.direct//ping</bind> + </virtualhost> + <virtualhost> + <path>test</path> <bind>direct://amq.direct//queue</bind> <bind>direct://amq.direct//ping</bind> </virtualhost> diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java index 6f93a14469..2e6293081d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java @@ -30,6 +30,7 @@ import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; @@ -50,20 +51,28 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr private final ExchangeFactory _exchangeFactory; private final MessageStore _messageStore; + private final VirtualHost.VirtualHostMBean _virtualHostMBean; + @MBeanConstructor("Creates the Broker Manager MBean") - public AMQBrokerManagerMBean() throws JMException + public AMQBrokerManagerMBean(VirtualHost.VirtualHostMBean virtualHostMBean) throws JMException { super(ManagedBroker.class, ManagedBroker.TYPE); - IApplicationRegistry appRegistry = ApplicationRegistry.getInstance(); - _queueRegistry = appRegistry.getQueueRegistry(); - _exchangeRegistry = appRegistry.getExchangeRegistry(); - _exchangeFactory = ApplicationRegistry.getInstance().getExchangeFactory(); - _messageStore = ApplicationRegistry.getInstance().getMessageStore(); + + _virtualHostMBean = virtualHostMBean; + VirtualHost virtualHost = virtualHostMBean.getVirtualHost(); + + + + + _queueRegistry = virtualHost.getQueueRegistry(); + _exchangeRegistry = virtualHost.getExchangeRegistry(); + _messageStore = virtualHost.getMessageStore(); + _exchangeFactory = virtualHost.getExchangeFactory(); } public String getObjectInstanceName() { - return this.getClass().getName(); + return _virtualHostMBean.getVirtualHost().getName(); } /** @@ -144,7 +153,7 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr try { - queue = new AMQQueue(new AMQShortString(queueName), durable, new AMQShortString(owner), autoDelete, _queueRegistry); + queue = new AMQQueue(new AMQShortString(queueName), durable, new AMQShortString(owner), autoDelete, getVirtualHost()); if (queue.isDurable() && !queue.isAutoDelete()) { _messageStore.createQueue(queue); @@ -157,6 +166,11 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr } } + private VirtualHost getVirtualHost() + { + return _virtualHostMBean.getVirtualHost(); + } + /** * Deletes the queue from queue registry and persistant storage. * @@ -183,11 +197,17 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr } } - public ObjectName getObjectName() throws MalformedObjectNameException + public ManagedObject getParentObject() { - StringBuffer objectName = new StringBuffer(ManagedObject.DOMAIN); - objectName.append(":type=").append(getType()); - - return new ObjectName(objectName.toString()); + return _virtualHostMBean; } + +// public ObjectName getObjectName() throws MalformedObjectNameException +// { +// StringBuffer objectName = new StringBuffer(ManagedObject.DOMAIN); +// objectName.append(".").append(getVirtualHost().getName()); +// objectName.append(":type=").append(getType()); +// +// return new ObjectName(objectName.toString()); +// } } // End of MBean class diff --git a/java/broker/src/main/java/org/apache/qpid/server/Main.java b/java/broker/src/main/java/org/apache/qpid/server/Main.java index ffd25de0b4..c45d1ad2c2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/Main.java +++ b/java/broker/src/main/java/org/apache/qpid/server/Main.java @@ -62,7 +62,7 @@ import java.util.StringTokenizer; * Main entry point for AMQPD. * */ -public class Main implements ProtocolVersionList, Managable +public class Main implements ProtocolVersionList { private static final Logger _logger = Logger.getLogger(Main.class); @@ -70,7 +70,8 @@ public class Main implements ProtocolVersionList, Managable private static final String DEFAULT_LOG_CONFIG_FILENAME = "log4j.xml"; - private AMQBrokerManagerMBean _mbean = null; + + private static Main _instance; protected static class InitException extends Exception { @@ -265,7 +266,6 @@ public class Main implements ProtocolVersionList, Managable } bind(port, connectorConfig); - createAndRegisterBrokerMBean(); } protected void setupVirtualHosts(String configFileParent, String configFilePath) throws ConfigurationException, AMQException, URLSyntaxException @@ -368,7 +368,7 @@ public class Main implements ProtocolVersionList, Managable public static void main(String[] args) { - new Main(args); + _instance = new Main(args); } private byte[] parseIP(String address) throws Exception @@ -430,21 +430,4 @@ public class Main implements ProtocolVersionList, Managable } } - private void createAndRegisterBrokerMBean() throws AMQException - { - try - { - _mbean = new AMQBrokerManagerMBean(); - _mbean.register(); - } - catch (JMException ex) - { - throw new AMQException("Exception occured in creating AMQBrokerManager MBean"); - } - } - - public ManagedObject getManagedObject() - { - return _mbean; - } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/Configurator.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/Configurator.java index 5e3ac03ba7..379da94aa3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/configuration/Configurator.java +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/Configurator.java @@ -37,14 +37,15 @@ public class Configurator { private static final Logger _logger = Logger.getLogger(Configurator.class); + /** - * Configure a given instance using the application configuration. Note that superclasses are <b>not</b> + * Configure a given instance using the supplied configuration. Note that superclasses are <b>not</b> * currently configured but this could easily be added if required. * @param instance the instance to configure + * @param config the configuration to use to configure the object */ - public static void configure(Object instance) + public static void configure(Object instance, Configuration config) { - final Configuration config = ApplicationRegistry.getInstance().getConfiguration(); for (Field f : instance.getClass().getDeclaredFields()) { @@ -56,6 +57,18 @@ public class Configurator } } + + + /** + * Configure a given instance using the application configuration. Note that superclasses are <b>not</b> + * currently configured but this could easily be added if required. + * @param instance the instance to configure + */ + public static void configure(Object instance) + { + configure(instance, ApplicationRegistry.getInstance().getConfiguration()); + } + private static void setValueInField(Field f, Object instance, Configuration config, Configured annotation) { Class fieldClass = f.getType(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java index 7e807304c8..361a21b284 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java @@ -28,6 +28,7 @@ import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.log4j.Logger; @@ -113,6 +114,7 @@ public class VirtualHostConfiguration } _logger.info("VirtualHost:'" + prop + "'"); + String virtualHost = prop.toString(); prop = _config.getProperty(path + "." + XML_BIND); if (prop instanceof Collection) @@ -121,16 +123,16 @@ public class VirtualHostConfiguration _logger.debug("Number of Bindings: " + bindings); for (int dest = 0; dest < bindings; dest++) { - loadBinding(path, dest); + loadBinding(virtualHost, path, dest); } } else { - loadBinding(path, -1); + loadBinding(virtualHost,path, -1); } } - private void loadBinding(String rootpath, int index) throws AMQException, ConfigurationException, URLSyntaxException + private void loadBinding(String virtualHost, String rootpath, int index) throws AMQException, ConfigurationException, URLSyntaxException { String path = rootpath + "." + XML_BIND; if (index != -1) @@ -146,7 +148,7 @@ public class VirtualHostConfiguration try { - bind(binding); + bind(virtualHost, binding); } catch (AMQException amqe) { @@ -155,7 +157,7 @@ public class VirtualHostConfiguration } } - private void bind(AMQBindingURL binding) throws AMQException, ConfigurationException + private void bind(String virtualHostName, AMQBindingURL binding) throws AMQException, ConfigurationException { AMQShortString queueName = binding.getQueueName(); @@ -169,9 +171,10 @@ public class VirtualHostConfiguration } //Get references to Broker Registries - QueueRegistry queueRegistry = ApplicationRegistry.getInstance().getQueueRegistry(); - MessageStore messageStore = ApplicationRegistry.getInstance().getMessageStore(); - ExchangeRegistry exchangeRegistry = ApplicationRegistry.getInstance().getExchangeRegistry(); + VirtualHost virtualHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost(virtualHostName); + QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); + MessageStore messageStore = virtualHost.getMessageStore(); + ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); synchronized (queueRegistry) { @@ -184,7 +187,7 @@ public class VirtualHostConfiguration queue = new AMQQueue(queueName, Boolean.parseBoolean(binding.getOption(AMQBindingURL.OPTION_DURABLE)), null /* These queues will have no owner */, - false /* Therefore autodelete makes no sence */, queueRegistry); + false /* Therefore autodelete makes no sence */, virtualHost); if (queue.isDurable()) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index 94c792c358..caafb83568 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -25,6 +25,10 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.management.AMQManagedObject; import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.management.ManagedObject; +import org.apache.qpid.server.management.ManagedObjectRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.registry.ApplicationRegistry; import javax.management.MalformedObjectNameException; import javax.management.NotCompliantMBeanException; @@ -34,10 +38,14 @@ public abstract class AbstractExchange implements Exchange, Managable { private AMQShortString _name; + + protected boolean _durable; protected String _exchangeType; protected int _ticket; + private VirtualHost _virtualHost; + protected ExchangeMBean _exchangeMbean; /** @@ -57,6 +65,11 @@ public abstract class AbstractExchange implements Exchange, Managable super(ManagedExchange.class, ManagedExchange.TYPE); } + public ManagedObject getParentObject() + { + return _virtualHost.getManagedObject(); + } + public String getObjectInstanceName() { return _name.toString(); @@ -87,13 +100,17 @@ public abstract class AbstractExchange implements Exchange, Managable return _autoDelete; } - public ObjectName getObjectName() throws MalformedObjectNameException +// public ObjectName getObjectName() throws MalformedObjectNameException +// { +// String objNameString = super.getObjectName().toString(); +// objNameString = objNameString + ",VirtualHost="+ _virtualHost.getName() +",ExchangeType=" + _exchangeType; +// return new ObjectName(objNameString); +// } + + protected ManagedObjectRegistry getManagedObjectRegistry() { - String objNameString = super.getObjectName().toString(); - objNameString = objNameString + ",ExchangeType=" + _exchangeType; - return new ObjectName(objNameString); + return ApplicationRegistry.getInstance().getManagedObjectRegistry(); } - } // End of MBean class public AMQShortString getName() @@ -108,8 +125,9 @@ public abstract class AbstractExchange implements Exchange, Managable */ protected abstract ExchangeMBean createMBean() throws AMQException; - public void initialise(AMQShortString name, boolean durable, int ticket, boolean autoDelete) throws AMQException + public void initialise(VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete) throws AMQException { + _virtualHost = host; _name = name; _durable = durable; _autoDelete = autoDelete; @@ -151,4 +169,13 @@ public abstract class AbstractExchange implements Exchange, Managable return _exchangeMbean; } + public VirtualHost getVirtualHost() + { + return _virtualHost; + } + + public QueueRegistry getQueueRegistry() + { + return getVirtualHost().getQueueRegistry(); + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java index 77f9819048..d77f1b6c5a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java @@ -24,6 +24,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQUnknownExchangeType; +import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; @@ -36,9 +37,11 @@ public class DefaultExchangeFactory implements ExchangeFactory private static final Logger _logger = Logger.getLogger(DefaultExchangeFactory.class); private Map<AMQShortString, Class<? extends Exchange>> _exchangeClassMap = new HashMap<AMQShortString, Class<? extends Exchange>>(); + private final VirtualHost _host; - public DefaultExchangeFactory() + public DefaultExchangeFactory(VirtualHost host) { + _host = host; _exchangeClassMap.put(ExchangeDefaults.DIRECT_EXCHANGE_CLASS, org.apache.qpid.server.exchange.DestNameExchange.class); _exchangeClassMap.put(ExchangeDefaults.TOPIC_EXCHANGE_CLASS, org.apache.qpid.server.exchange.DestWildExchange.class); _exchangeClassMap.put(ExchangeDefaults.HEADERS_EXCHANGE_CLASS, org.apache.qpid.server.exchange.HeadersExchange.class); @@ -59,7 +62,7 @@ public class DefaultExchangeFactory implements ExchangeFactory try { Exchange e = exchClass.newInstance(); - e.initialise(exchange, durable, ticket, autoDelete); + e.initialise(_host, exchange, durable, ticket, autoDelete); return e; } catch (InstantiationException e) diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java index fcd6e8fdad..8862bd5104 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java @@ -109,7 +109,7 @@ public class DestNameExchange extends AbstractExchange public void createNewBinding(String queueName, String binding) throws JMException { - AMQQueue queue = ApplicationRegistry.getInstance().getQueueRegistry().getQueue(new AMQShortString(queueName)); + AMQQueue queue = getQueueRegistry().getQueue(new AMQShortString(queueName)); if (queue == null) { throw new JMException("Queue \"" + queueName + "\" is not registered with the exchange."); diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java index d1b35451b5..c38b9fe9b4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java @@ -31,6 +31,7 @@ import org.apache.qpid.server.management.MBeanDescription; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; import javax.management.JMException; import javax.management.MBeanException; @@ -110,7 +111,7 @@ public class DestWildExchange extends AbstractExchange public void createNewBinding(String queueName, String binding) throws JMException { - AMQQueue queue = ApplicationRegistry.getInstance().getQueueRegistry().getQueue(new AMQShortString(queueName)); + AMQQueue queue = getQueueRegistry().getQueue(new AMQShortString(queueName)); if (queue == null) throw new JMException("Queue \"" + queueName + "\" is not registered with the exchange."); diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java index 366dcb11b3..c012a1c1c9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java @@ -25,13 +25,14 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.virtualhost.VirtualHost; public interface Exchange { AMQShortString getName(); AMQShortString getType(); - void initialise(AMQShortString name, boolean durable, int ticket, boolean autoDelete) throws AMQException; + void initialise(VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete) throws AMQException; boolean isDurable(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java index 2e7457e4a6..82039c345f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java @@ -44,7 +44,7 @@ public class FanoutExchange extends AbstractExchange private TabularType _bindinglistDataType = null;
private TabularDataSupport _bindingList = null;
- @MBeanConstructor("Creates an MBean for AMQ direct exchange")
+ @MBeanConstructor("Creates an MBean for AMQ fanout exchange")
public FanoutExchangeMBean() throws JMException
{
super();
@@ -86,7 +86,7 @@ public class FanoutExchange extends AbstractExchange public void createNewBinding(String queueName, String binding) throws JMException
{
- AMQQueue queue = ApplicationRegistry.getInstance().getQueueRegistry().getQueue(new AMQShortString(queueName));
+ AMQQueue queue = getQueueRegistry().getQueue(new AMQShortString(queueName));
if (queue == null)
{
throw new JMException("Queue \"" + queueName + "\" is not registered with the exchange.");
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java index 93933cd88d..3a49ff586b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java @@ -150,7 +150,7 @@ public class HeadersExchange extends AbstractExchange */ public void createNewBinding(String queueName, String binding) throws JMException { - AMQQueue queue = ApplicationRegistry.getInstance().getQueueRegistry().getQueue(new AMQShortString(queueName)); + AMQQueue queue = getQueueRegistry().getQueue(new AMQShortString(queueName)); if (queue == null) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java index e12dd4a9db..b2b1b0a716 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java @@ -29,6 +29,7 @@ import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; import org.apache.log4j.Logger; public class BasicAckMethodHandler implements StateAwareMethodListener<BasicAckBody> @@ -46,10 +47,10 @@ public class BasicAckMethodHandler implements StateAwareMethodListener<BasicAckB { } - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, - AMQMethodEvent<BasicAckBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicAckBody> evt) throws AMQException { + AMQProtocolSession protocolSession = stateManager.getProtocolSession(); + if (_log.isDebugEnabled()) { _log.debug("Ack received on channel " + evt.getChannelId()); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java index a23a29941f..2bbb696e90 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java @@ -30,6 +30,7 @@ import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; import org.apache.qpid.AMQException; public class BasicCancelMethodHandler implements StateAwareMethodListener<BasicCancelBody> @@ -45,10 +46,10 @@ public class BasicCancelMethodHandler implements StateAwareMethodListener<BasicC { } - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, - AMQMethodEvent<BasicCancelBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicCancelBody> evt) throws AMQException { + AMQProtocolSession protocolSession = stateManager.getProtocolSession(); + final AMQChannel channel = protocolSession.getChannel(evt.getChannelId()); final BasicCancelBody body = evt.getMethod(); channel.unsubscribeConsumer(protocolSession, body.consumerTag); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java index 721001b454..bc695431c9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java @@ -28,6 +28,8 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.framing.*; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.ConsumerTagNotUniqueException; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; @@ -53,14 +55,15 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic { } - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession session, - AMQMethodEvent<BasicConsumeBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException { + AMQProtocolSession session = stateManager.getProtocolSession(); + BasicConsumeBody body = evt.getMethod(); final int channelId = evt.getChannelId(); AMQChannel channel = session.getChannel(channelId); + VirtualHost vHost = session.getVirtualHost(); if (channel == null) { _log.error("Channel " + channelId + " not found"); @@ -69,7 +72,7 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic else { - AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : queueRegistry.getQueue(body.queue); + AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : vHost.getQueueRegistry().getQueue(body.queue); if (queue == null) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java index 1eb3152973..51b585ecc5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java @@ -10,6 +10,8 @@ import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.ConsumerTagNotUniqueException;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInvalidSelectorException;
import org.apache.qpid.protocol.AMQConstant;
@@ -31,12 +33,13 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB {
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession session,
- AMQMethodEvent<BasicGetBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicGetBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
+
BasicGetBody body = evt.getMethod();
final int channelId = evt.getChannelId();
+ VirtualHost vHost = session.getVirtualHost();
AMQChannel channel = session.getChannel(channelId);
if (channel == null)
@@ -46,7 +49,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB }
else
{
- AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : queueRegistry.getQueue(body.queue);
+ AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : vHost.getQueueRegistry().getQueue(body.queue);
if (queue == null)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java index 0ef30be265..78a246d80e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java @@ -28,6 +28,8 @@ import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.ChannelCloseBody; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.protocol.AMQMethodEvent; @@ -53,10 +55,10 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basi { } - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, - AMQMethodEvent<BasicPublishBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicPublishBody> evt) throws AMQException { + AMQProtocolSession session = stateManager.getProtocolSession(); + final BasicPublishBody body = evt.getMethod(); if (_log.isDebugEnabled()) @@ -70,7 +72,8 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basi body.exchange = ExchangeDefaults.DIRECT_EXCHANGE_NAME; } - Exchange e = exchangeRegistry.getExchange(body.exchange); + VirtualHost vHost = session.getVirtualHost(); + Exchange e = vHost.getExchangeRegistry().getExchange(body.exchange); // if the exchange does not exist we raise a channel exception if (e == null) { @@ -82,8 +85,8 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basi // The partially populated BasicDeliver frame plus the received route body // is stored in the channel. Once the final body frame has been received // it is routed to the exchange. - AMQChannel channel = protocolSession.getChannel(evt.getChannelId()); - channel.setPublishFrame(body, protocolSession); + AMQChannel channel = session.getChannel(evt.getChannelId()); + channel.setPublishFrame(body, session); } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java index 2bab4cac5c..325e5226ad 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java @@ -28,6 +28,7 @@ import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.AMQException; @@ -40,9 +41,9 @@ public class BasicQosHandler implements StateAwareMethodListener<BasicQosBody> return _instance; } - public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, - AMQProtocolSession session, AMQMethodEvent<BasicQosBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicQosBody> evt) throws AMQException { + AMQProtocolSession session = stateManager.getProtocolSession(); session.getChannel(evt.getChannelId()).setPrefetchCount(evt.getMethod().prefetchCount); session.getChannel(evt.getChannelId()).setPrefetchSize(evt.getMethod().prefetchSize); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java index f3e0cc3a63..a247ee33ce 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java @@ -27,6 +27,7 @@ import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; import org.apache.qpid.framing.BasicRecoverBody; import org.apache.qpid.AMQException; import org.apache.log4j.Logger; @@ -42,18 +43,18 @@ public class BasicRecoverMethodHandler implements StateAwareMethodListener<Basic return _instance; } - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, - AMQMethodEvent<BasicRecoverBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicRecoverBody> evt) throws AMQException { - _logger.debug("Recover received on protocol session " + protocolSession + " and channel " + evt.getChannelId()); - AMQChannel channel = protocolSession.getChannel(evt.getChannelId()); + AMQProtocolSession session = stateManager.getProtocolSession(); + + _logger.debug("Recover received on protocol session " + session + " and channel " + evt.getChannelId()); + AMQChannel channel = session.getChannel(evt.getChannelId()); if (channel == null) { throw new AMQException("Unknown channel " + evt.getChannelId()); } BasicRecoverBody body = evt.getMethod(); - channel.resend(protocolSession, body.requeue); + channel.resend(session, body.requeue); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java index a24ecd9b01..d46a4f6424 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java @@ -31,6 +31,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; public class ChannelCloseHandler implements StateAwareMethodListener<ChannelCloseBody> { @@ -47,18 +48,17 @@ public class ChannelCloseHandler implements StateAwareMethodListener<ChannelClos { } - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, - AMQMethodEvent<ChannelCloseBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ChannelCloseBody> evt) throws AMQException { + AMQProtocolSession session = stateManager.getProtocolSession(); ChannelCloseBody body = evt.getMethod(); _logger.info("Received channel close for id " + evt.getChannelId() + " citing class " + body.classId + " and method " + body.methodId); - protocolSession.closeChannel(evt.getChannelId()); + session.closeChannel(evt.getChannelId()); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. AMQFrame response = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0); - protocolSession.writeFrame(response); + session.writeFrame(response); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java index 81a5371829..be11d5e939 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java @@ -28,6 +28,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; import org.apache.log4j.Logger; public class ChannelCloseOkHandler implements StateAwareMethodListener<ChannelCloseOkBody> @@ -45,9 +46,7 @@ public class ChannelCloseOkHandler implements StateAwareMethodListener<ChannelCl { } - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, - AMQMethodEvent<ChannelCloseOkBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ChannelCloseOkBody> evt) throws AMQException { _logger.info("Received channel-close-ok for channel-id " + evt.getChannelId()); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java index d92a4eed6a..62f7ed4b78 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java @@ -48,13 +48,12 @@ public class ChannelFlowHandler implements StateAwareMethodListener<ChannelFlowB { } - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, - AMQMethodEvent<ChannelFlowBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ChannelFlowBody> evt) throws AMQException { + AMQProtocolSession session = stateManager.getProtocolSession(); ChannelFlowBody body = evt.getMethod(); - AMQChannel channel = protocolSession.getChannel(evt.getChannelId()); + AMQChannel channel = session.getChannel(evt.getChannelId()); channel.setSuspended(!body.active); _logger.debug("Channel.Flow for channel " + evt.getChannelId() + ", active=" + body.active); @@ -64,6 +63,6 @@ public class ChannelFlowHandler implements StateAwareMethodListener<ChannelFlowB AMQFrame response = ChannelFlowOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0, // AMQP version (major, minor) body.active); // active - protocolSession.writeFrame(response); + session.writeFrame(response); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java index 6ea9dfa595..5cd3f8ac89 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java @@ -25,6 +25,7 @@ import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.ChannelOpenBody; import org.apache.qpid.framing.ChannelOpenOkBody; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.exchange.ExchangeRegistry; @@ -47,18 +48,18 @@ public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenB { } - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, - AMQMethodEvent<ChannelOpenBody> evt) throws AMQException - { - IApplicationRegistry registry = ApplicationRegistry.getInstance(); - final AMQChannel channel = new AMQChannel(evt.getChannelId(), registry.getMessageStore(), - exchangeRegistry); - protocolSession.addChannel(channel); + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ChannelOpenBody> evt) throws AMQException + { + AMQProtocolSession session = stateManager.getProtocolSession(); + VirtualHost virtualHost = session.getVirtualHost(); + + final AMQChannel channel = new AMQChannel(evt.getChannelId(), virtualHost.getMessageStore(), + virtualHost.getExchangeRegistry()); + session.addChannel(channel); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. AMQFrame response = ChannelOpenOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0); - protocolSession.writeFrame(response); + session.writeFrame(response); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java index 0fe25a1c89..8bc849d5cb 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java @@ -47,16 +47,15 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener<C { } - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, - AMQMethodEvent<ConnectionCloseBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ConnectionCloseBody> evt) throws AMQException { + AMQProtocolSession session = stateManager.getProtocolSession(); final ConnectionCloseBody body = evt.getMethod(); _logger.info("ConnectionClose received with reply code/reply text " + body.replyCode + "/" + - body.replyText + " for " + protocolSession); + body.replyText + " for " + session); try { - protocolSession.closeSession(); + session.closeSession(); } catch (Exception e) { @@ -66,6 +65,6 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener<C // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. final AMQFrame response = ConnectionCloseOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0); - protocolSession.writeFrame(response); + session.writeFrame(response); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java index bcdd86d2ef..c10a731cc5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java @@ -46,17 +46,16 @@ public class ConnectionCloseOkMethodHandler implements StateAwareMethodListener< { } - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, - AMQMethodEvent<ConnectionCloseOkBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ConnectionCloseOkBody> evt) throws AMQException { + AMQProtocolSession session = stateManager.getProtocolSession(); //todo should this not do more than just log the method? _logger.info("Received Connection-close-ok"); try { stateManager.changeState(AMQState.CONNECTION_CLOSED); - protocolSession.closeSession(); + session.closeSession(); } catch (Exception e) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java index 8056ff9adb..88717c446b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java @@ -27,11 +27,13 @@ import org.apache.qpid.framing.ConnectionOpenOkBody; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQState; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.virtualhost.VirtualHost; public class ConnectionOpenMethodHandler implements StateAwareMethodListener<ConnectionOpenBody> { @@ -51,28 +53,39 @@ public class ConnectionOpenMethodHandler implements StateAwareMethodListener<Con return new AMQShortString(Long.toString(System.currentTimeMillis())); } - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, - AMQMethodEvent<ConnectionOpenBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ConnectionOpenBody> evt) throws AMQException { + AMQProtocolSession session = stateManager.getProtocolSession(); ConnectionOpenBody body = evt.getMethod(); - + String virtualHostName = String.valueOf(body.virtualHost); - //todo //FIXME The virtual host must be validated by the server for the connection to open-ok - // See Spec (0.8.2). Section 3.1.2 Virtual Hosts - if (protocolSession.getContextKey() == null) + VirtualHost virtualHost = stateManager.getVirtualHostRegistry().getVirtualHost(virtualHostName); + + if(virtualHost == null) { - protocolSession.setContextKey(generateClientID()); + throw body.getConnectionException(AMQConstant.NOT_FOUND.getCode(), "Unknown virtual host: " + virtualHostName); } + else + { + session.setVirtualHost( virtualHost ); + - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - AMQFrame response = ConnectionOpenOkBody.createAMQFrame((short)0, - (byte)8, (byte)0, // AMQP version (major, minor) - body.virtualHost); // knownHosts - stateManager.changeState(AMQState.CONNECTION_OPEN); - protocolSession.writeFrame(response); + + // See Spec (0.8.2). Section 3.1.2 Virtual Hosts + if (session.getContextKey() == null) + { + session.setContextKey(generateClientID()); + } + + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame response = ConnectionOpenOkBody.createAMQFrame((short)0, + (byte)8, (byte)0, // AMQP version (major, minor) + body.virtualHost); + stateManager.changeState(AMQState.CONNECTION_OPEN); + session.writeFrame(response); + } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java index d33874b727..11cbaade30 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java @@ -54,14 +54,13 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener { } - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, - AMQMethodEvent<ConnectionSecureOkBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ConnectionSecureOkBody> evt) throws AMQException { + AMQProtocolSession session = stateManager.getProtocolSession(); ConnectionSecureOkBody body = evt.getMethod(); AuthenticationManager authMgr = ApplicationRegistry.getInstance().getAuthenticationManager(); - SaslServer ss = protocolSession.getSaslServer(); + SaslServer ss = session.getSaslServer(); if (ss == null) { throw new AMQException("No SASL context set up in session"); @@ -84,8 +83,8 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener ConnectionCloseBody.getMethod((byte)8, (byte)0), // methodId AMQConstant.NOT_ALLOWED.getCode(), // replyCode AMQConstant.NOT_ALLOWED.getName()); // replyText - protocolSession.writeFrame(close); - disposeSaslServer(protocolSession); + session.writeFrame(close); + disposeSaslServer(session); break; case SUCCESS: _logger.info("Connected as: " + ss.getAuthorizationID()); @@ -101,8 +100,8 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener Integer.MAX_VALUE, // channelMax ConnectionStartOkMethodHandler.getConfiguredFrameSize(), // frameMax HeartbeatConfig.getInstance().getDelay()); // heartbeat - protocolSession.writeFrame(tune); - disposeSaslServer(protocolSession); + session.writeFrame(tune); + disposeSaslServer(session); break; case CONTINUE: stateManager.changeState(AMQState.CONNECTION_NOT_AUTH); @@ -112,7 +111,7 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener AMQFrame challenge = ConnectionSecureBody.createAMQFrame(0, (byte)8, (byte)0, // AMQP version (major, minor) authResult.challenge); // challenge - protocolSession.writeFrame(challenge); + session.writeFrame(challenge); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java index 6cb384f081..b45a017166 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java @@ -60,10 +60,9 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener< { } - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, - AMQMethodEvent<ConnectionStartOkBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ConnectionStartOkBody> evt) throws AMQException { + AMQProtocolSession session = stateManager.getProtocolSession(); final ConnectionStartOkBody body = evt.getMethod(); _logger.info("SASL Mechanism selected: " + body.mechanism); _logger.info("Locale selected: " + body.locale); @@ -73,15 +72,15 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener< SaslServer ss = null; try { - ss = authMgr.createSaslServer(String.valueOf(body.mechanism), protocolSession.getLocalFQDN()); - protocolSession.setSaslServer(ss); + ss = authMgr.createSaslServer(String.valueOf(body.mechanism), session.getLocalFQDN()); + session.setSaslServer(ss); AuthenticationResult authResult = authMgr.authenticate(ss, body.response); //save clientProperties - if (protocolSession.getClientProperties() == null) + if (session.getClientProperties() == null) { - protocolSession.setClientProperties(body.clientProperties); + session.setClientProperties(body.clientProperties); } switch (authResult.status) @@ -100,7 +99,7 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener< Integer.MAX_VALUE, // channelMax getConfiguredFrameSize(), // frameMax HeartbeatConfig.getInstance().getDelay()); // heartbeat - protocolSession.writeFrame(tune); + session.writeFrame(tune); break; case CONTINUE: stateManager.changeState(AMQState.CONNECTION_NOT_AUTH); @@ -110,12 +109,12 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener< AMQFrame challenge = ConnectionSecureBody.createAMQFrame(0, (byte)8, (byte)0, // AMQP version (major, minor) authResult.challenge); // challenge - protocolSession.writeFrame(challenge); + session.writeFrame(challenge); } } catch (SaslException e) { - disposeSaslServer(protocolSession); + disposeSaslServer(session); throw new AMQException("SASL error: " + e, e); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java index 960643325a..020e93b7d2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java @@ -42,16 +42,15 @@ public class ConnectionTuneOkMethodHandler implements StateAwareMethodListener<C return _instance; } - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, - AMQMethodEvent<ConnectionTuneOkBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ConnectionTuneOkBody> evt) throws AMQException { + AMQProtocolSession session = stateManager.getProtocolSession(); ConnectionTuneOkBody body = evt.getMethod(); if (_logger.isDebugEnabled()) { _logger.debug(body); } stateManager.changeState(AMQState.CONNECTION_NOT_OPENED); - protocolSession.initHeartbeats(body.heartbeat); + session.initHeartbeats(body.heartbeat); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java index 596e6bf332..30da1398b3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java @@ -30,6 +30,7 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.virtualhost.VirtualHost; /** * @author Apache Software Foundation @@ -61,10 +62,12 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo { } - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, - AMQMethodEvent<ExchangeBoundBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ExchangeBoundBody> evt) throws AMQException { + AMQProtocolSession session = stateManager.getProtocolSession(); + VirtualHost virtualHost = session.getVirtualHost(); + QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. byte major = (byte)8; @@ -79,7 +82,7 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo { throw new AMQException("Exchange exchange must not be null"); } - Exchange exchange = exchangeRegistry.getExchange(exchangeName); + Exchange exchange = virtualHost.getExchangeRegistry().getExchange(exchangeName); AMQFrame response; if (exchange == null) { @@ -112,6 +115,7 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo } else { + AMQQueue queue = queueRegistry.getQueue(queueName); if (queue == null) { @@ -194,6 +198,6 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo " to exchange " + exchangeName)); // replyText } } - protocolSession.writeFrame(response); + session.writeFrame(response); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java index 84e9a4e3f4..03af56ff14 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java @@ -38,6 +38,7 @@ import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; public class ExchangeDeclareHandler implements StateAwareMethodListener<ExchangeDeclareBody> { @@ -50,17 +51,19 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange return _instance; } - private final ExchangeFactory exchangeFactory; + private ExchangeDeclareHandler() { - exchangeFactory = ApplicationRegistry.getInstance().getExchangeFactory(); } - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, - AMQMethodEvent<ExchangeDeclareBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ExchangeDeclareBody> evt) throws AMQException { + AMQProtocolSession session = stateManager.getProtocolSession(); + VirtualHost virtualHost = session.getVirtualHost(); + ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); + ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory(); + final ExchangeDeclareBody body = evt.getMethod(); if (_logger.isDebugEnabled()) { @@ -106,7 +109,7 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. AMQFrame response = ExchangeDeclareOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0); - protocolSession.writeFrame(response); + session.writeFrame(response); } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java index 2faba57e04..fc2a4b1fd5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java @@ -31,6 +31,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.virtualhost.VirtualHost; public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeDeleteBody> { @@ -45,10 +46,12 @@ public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeD { } - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, - AMQMethodEvent<ExchangeDeleteBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ExchangeDeleteBody> evt) throws AMQException { + AMQProtocolSession session = stateManager.getProtocolSession(); + VirtualHost virtualHost = session.getVirtualHost(); + ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); + ExchangeDeleteBody body = evt.getMethod(); try { @@ -57,7 +60,7 @@ public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeD // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. AMQFrame response = ExchangeDeleteOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0); - protocolSession.writeFrame(response); + session.writeFrame(response); } catch (ExchangeInUseException e) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java index 45180d0cb6..0a23b9bd86 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java @@ -34,6 +34,7 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.virtualhost.VirtualHost; public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> { @@ -50,15 +51,19 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> { } - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, - AMQMethodEvent<QueueBindBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<QueueBindBody> evt) throws AMQException { + AMQProtocolSession session = stateManager.getProtocolSession(); + VirtualHost virtualHost = session.getVirtualHost(); + ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); + QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); + + final QueueBindBody body = evt.getMethod(); final AMQQueue queue; if (body.queue == null) { - queue = protocolSession.getChannel(evt.getChannelId()).getDefaultQueue(); + queue = session.getChannel(evt.getChannelId()).getDefaultQueue(); if (queue == null) { throw new AMQException("No default queue defined on channel and queue was null"); @@ -94,7 +99,7 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. final AMQFrame response = QueueBindOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0); - protocolSession.writeFrame(response); + session.writeFrame(response); } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java index b62fe22b89..fdf98bb49e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java @@ -22,7 +22,6 @@ package org.apache.qpid.server.handler; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.AMQChannelException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.configuration.Configured; @@ -37,7 +36,7 @@ import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.qpid.server.configuration.Configurator; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; import java.text.MessageFormat; import java.util.concurrent.atomic.AtomicInteger; @@ -58,18 +57,21 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar private final AtomicInteger _counter = new AtomicInteger(); - private final MessageStore _store; + protected QueueDeclareHandler() { Configurator.configure(this); - _store = ApplicationRegistry.getInstance().getMessageStore(); } - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, - AMQMethodEvent<QueueDeclareBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException { + AMQProtocolSession session = stateManager.getProtocolSession(); + VirtualHost virtualHost = session.getVirtualHost(); + ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); + QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); + MessageStore store = virtualHost.getMessageStore(); + QueueDeclareBody body = evt.getMethod(); // if we aren't given a queue name, we create one which we return to the client @@ -94,10 +96,10 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar } else { - queue = createQueue(body, queueRegistry, protocolSession); + queue = createQueue(body, virtualHost, session); if (queue.isDurable() && !queue.isAutoDelete()) { - _store.createQueue(queue); + store.createQueue(queue); } queueRegistry.registerQueue(queue); if (autoRegister) @@ -109,14 +111,14 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar } } } - else if(queue.getOwner() != null && !protocolSession.getContextKey().equals(queue.getOwner())) + else if(queue.getOwner() != null && !session.getContextKey().equals(queue.getOwner())) { // todo - constant throw body.getChannelException(405, "Cannot declare queue, as exclusive queue with same name declared on another connection"); } //set this as the default queue on the channel: - protocolSession.getChannel(evt.getChannelId()).setDefaultQueue(queue); + session.getChannel(evt.getChannelId()).setDefaultQueue(queue); } if (!body.nowait) @@ -130,7 +132,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar queue.getMessageCount(), // messageCount body.queue); // queue _log.info("Queue " + body.queue + " declared successfully"); - protocolSession.writeFrame(response); + session.writeFrame(response); } } @@ -144,10 +146,43 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar return MessageFormat.format("{0,number,0000000000000}", value); } - protected AMQQueue createQueue(QueueDeclareBody body, QueueRegistry registry, AMQProtocolSession session) + protected AMQQueue createQueue(QueueDeclareBody body, VirtualHost virtualHost, final AMQProtocolSession session) throws AMQException { + final QueueRegistry registry = virtualHost.getQueueRegistry(); AMQShortString owner = body.exclusive ? session.getContextKey() : null; - return new AMQQueue(body.queue, body.durable, owner, body.autoDelete || (!body.durable && body.exclusive), registry); + final AMQQueue queue = new AMQQueue(body.queue, body.durable, owner, body.autoDelete, virtualHost); + final AMQShortString queueName = queue.getName(); + + if(body.exclusive && !body.durable) + { + final AMQProtocolSession.Task deleteQueueTask = + new AMQProtocolSession.Task() + { + + public void doTask(AMQProtocolSession session) throws AMQException + { + if(registry.getQueue(queueName) == queue) + { + queue.delete(); + } + + } + }; + + session.addSessionCloseTask(deleteQueueTask); + + queue.addQueueDeleteTask(new AMQQueue.Task() + { + public void doTask(AMQQueue queue) + { + session.removeSessionCloseTask(deleteQueueTask); + } + }); + + + } + + return queue; } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java index 245d86a7a6..3f0833de41 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java @@ -24,18 +24,14 @@ import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.framing.QueueDeleteBody; import org.apache.qpid.framing.QueueDeleteOkBody; -import org.apache.qpid.framing.ChannelCloseBody; -import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.AMQException; -import org.apache.qpid.AMQChannelException; -import org.apache.qpid.protocol.AMQConstant; public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteBody> { @@ -47,7 +43,6 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDelete } private final boolean _failIfNotFound; - private final MessageStore _store; public QueueDeleteHandler() { @@ -57,12 +52,16 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDelete public QueueDeleteHandler(boolean failIfNotFound) { _failIfNotFound = failIfNotFound; - _store = ApplicationRegistry.getInstance().getMessageStore(); } - public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException { + AMQProtocolSession session = stateManager.getProtocolSession(); + VirtualHost virtualHost = session.getVirtualHost(); + QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); + MessageStore store = virtualHost.getMessageStore(); + QueueDeleteBody body = evt.getMethod(); AMQQueue queue; if(body.queue == null) @@ -71,7 +70,7 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDelete } else { - queue = queues.getQueue(body.queue); + queue = queueRegistry.getQueue(body.queue); } if(queue == null) @@ -96,7 +95,7 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDelete else { int purged = queue.delete(body.ifUnused, body.ifEmpty); - _store.removeQueue(queue.getName()); + store.removeQueue(queue.getName()); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java index ad63d36351..c8341ee5b4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java @@ -9,6 +9,7 @@ import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
@@ -34,8 +35,12 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod _failIfNotFound = failIfNotFound;
}
- public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueuePurgeBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<QueuePurgeBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ VirtualHost virtualHost = session.getVirtualHost();
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+
QueuePurgeBody body = evt.getMethod();
AMQQueue queue;
if(body.queue == null)
@@ -52,7 +57,7 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod }
else
{
- queue = queues.getQueue(body.queue);
+ queue = queueRegistry.getQueue(body.queue);
}
if(queue == null)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java index 68b0c584eb..6f6b8b6f3c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java @@ -24,6 +24,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.TxCommitBody; import org.apache.qpid.framing.TxCommitOkBody; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; @@ -47,10 +48,9 @@ public class TxCommitHandler implements StateAwareMethodListener<TxCommitBody> { } - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, - AMQMethodEvent<TxCommitBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<TxCommitBody> evt) throws AMQException { + AMQProtocolSession session = stateManager.getProtocolSession(); try { @@ -58,13 +58,13 @@ public class TxCommitHandler implements StateAwareMethodListener<TxCommitBody> { _log.debug("Commit received on channel " + evt.getChannelId()); } - AMQChannel channel = protocolSession.getChannel(evt.getChannelId()); + AMQChannel channel = session.getChannel(evt.getChannelId()); channel.commit(); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - protocolSession.writeFrame(TxCommitOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0)); - channel.processReturns(protocolSession); + session.writeFrame(TxCommitOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0)); + channel.processReturns(session); } catch(AMQException e) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java index d71c93a6c6..31a28d2275 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java @@ -30,6 +30,7 @@ import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.virtualhost.VirtualHost; public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBody> { @@ -44,20 +45,20 @@ public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBod { } - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, - AMQMethodEvent<TxRollbackBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<TxRollbackBody> evt) throws AMQException { + AMQProtocolSession session = stateManager.getProtocolSession(); + try{ - AMQChannel channel = protocolSession.getChannel(evt.getChannelId()); + AMQChannel channel = session.getChannel(evt.getChannelId()); channel.rollback(); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - protocolSession.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0)); + session.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0)); //Now resend all the unacknowledged messages back to the original subscribers. //(Must be done after the TxnRollback-ok response). - channel.resend(protocolSession, false); + channel.resend(session, false); }catch(AMQException e){ throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to rollback: " + e.getMessage()); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java index 7d66fa9d5c..30b70869f9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java @@ -29,6 +29,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.virtualhost.VirtualHost; public class TxSelectHandler implements StateAwareMethodListener<TxSelectBody> { @@ -43,14 +44,14 @@ public class TxSelectHandler implements StateAwareMethodListener<TxSelectBody> { } - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, - AMQMethodEvent<TxSelectBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<TxSelectBody> evt) throws AMQException { - protocolSession.getChannel(evt.getChannelId()).setLocalTransactional(); + AMQProtocolSession session = stateManager.getProtocolSession(); + + session.getChannel(evt.getChannelId()).setLocalTransactional(); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - protocolSession.writeFrame(TxSelectOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0)); + session.writeFrame(TxSelectOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0)); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java b/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java index 311eb8add9..46bac52e78 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java +++ b/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java @@ -67,7 +67,7 @@ public abstract class DefaultManagedObject extends StandardMBean implements Mana { try { - ApplicationRegistry.getInstance().getManagedObjectRegistry().registerObject(this); + getManagedObjectRegistry().registerObject(this); } catch (JMException e) { @@ -75,11 +75,16 @@ public abstract class DefaultManagedObject extends StandardMBean implements Mana } } + protected ManagedObjectRegistry getManagedObjectRegistry() + { + return ApplicationRegistry.getInstance().getManagedObjectRegistry(); + } + public void unregister() throws AMQException { try { - ApplicationRegistry.getInstance().getManagedObjectRegistry().unregisterObject(this); + getManagedObjectRegistry().unregisterObject(this); } catch (JMException e) { @@ -91,6 +96,7 @@ public abstract class DefaultManagedObject extends StandardMBean implements Mana { return getObjectInstanceName() + "[" + getType() + "]"; } + /** * Created the ObjectName as per the JMX Specs diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index ed74263596..9ca11abb56 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -35,10 +35,10 @@ import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.codec.AMQDecoder; import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.management.ManagedObject; -import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.state.AMQStateManager; @@ -51,6 +51,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.CopyOnWriteArrayList; public class AMQMinaProtocolSession implements AMQProtocolSession, ProtocolVersionList, @@ -65,16 +66,14 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, private AMQShortString _contextKey; + private VirtualHost _virtualHost; + private final Map<Integer, AMQChannel> _channelMap = new HashMap<Integer, AMQChannel>(); private final CopyOnWriteArraySet<AMQMethodListener> _frameListeners = new CopyOnWriteArraySet<AMQMethodListener>(); private final AMQStateManager _stateManager; - private final QueueRegistry _queueRegistry; - - private final ExchangeRegistry _exchangeRegistry; - private AMQCodecFactory _codecFactory; private AMQProtocolSessionMBean _managedObject; @@ -93,6 +92,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, private byte _major; private byte _minor; private FieldTable _clientProperties; + private final List<Task> _taskList = new CopyOnWriteArrayList<Task>(); + public ManagedObject getManagedObject() { @@ -100,23 +101,23 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, } - public AMQMinaProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, + public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory) throws AMQException { - _stateManager = new AMQStateManager(queueRegistry, exchangeRegistry, this); + _stateManager = new AMQStateManager(virtualHostRegistry, this); _minaProtocolSession = session; session.setAttachment(this); - _queueRegistry = queueRegistry; - _exchangeRegistry = exchangeRegistry; + + _codecFactory = codecFactory; _managedObject = createMBean(); _managedObject.register(); // this(session, queueRegistry, exchangeRegistry, codecFactory, new AMQStateManager()); } - public AMQMinaProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, + public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory, AMQStateManager stateManager) throws AMQException { @@ -124,8 +125,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, _minaProtocolSession = session; session.setAttachment(this); - _queueRegistry = queueRegistry; - _exchangeRegistry = exchangeRegistry; + _codecFactory = codecFactory; _managedObject = createMBean(); _managedObject.register(); @@ -461,6 +461,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _managedObject.unregister(); } + for(Task task : _taskList) + { + task.doTask(this); + } } } @@ -556,4 +560,27 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { return _minaProtocolSession.getRemoteAddress(); } + + + public VirtualHost getVirtualHost() + { + return _virtualHost; + } + + public void setVirtualHost(VirtualHost virtualHost) + { + _virtualHost = virtualHost; + } + + public void addSessionCloseTask(Task task) + { + _taskList.add(task); + } + + public void removeSessionCloseTask(Task task) + { + _taskList.remove(task); + } + + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java index 10e23caac3..474714680b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java @@ -53,41 +53,26 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter implements Protoco { private static final Logger _logger = Logger.getLogger(AMQPFastProtocolHandler.class); - /** - * The registry of all queues. This is passed to frame listeners when frame - * events occur. - */ - private final QueueRegistry _queueRegistry; + private final IApplicationRegistry _applicationRegistry; - /** - * The registry of all exchanges. This is passed to frame listeners when frame - * events occur. - */ - private final ExchangeRegistry _exchangeRegistry; private boolean _useSSL; public AMQPFastProtocolHandler(Integer applicationRegistryInstance) { - IApplicationRegistry registry = ApplicationRegistry.getInstance(applicationRegistryInstance); - - _queueRegistry = registry.getQueueRegistry(); - _exchangeRegistry = registry.getExchangeRegistry(); - _logger.debug("AMQPFastProtocolHandler created"); + this(ApplicationRegistry.getInstance(applicationRegistryInstance)); } - public AMQPFastProtocolHandler(QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry) + public AMQPFastProtocolHandler(IApplicationRegistry applicationRegistry) { - _queueRegistry = queueRegistry; - _exchangeRegistry = exchangeRegistry; + _applicationRegistry = applicationRegistry; _logger.debug("AMQPFastProtocolHandler created"); } protected AMQPFastProtocolHandler(AMQPFastProtocolHandler handler) { - this(handler._queueRegistry, handler._exchangeRegistry); + this(handler._applicationRegistry); } public void sessionCreated(IoSession protocolSession) throws Exception @@ -95,7 +80,7 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter implements Protoco SessionUtil.initialize(protocolSession); final AMQCodecFactory codecFactory = new AMQCodecFactory(true); - createSession(protocolSession, _queueRegistry, _exchangeRegistry, codecFactory); + createSession(protocolSession, _applicationRegistry, codecFactory); _logger.info("Protocol session created"); final ProtocolCodecFilter pcf = new ProtocolCodecFilter(codecFactory); @@ -120,9 +105,9 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter implements Protoco /** * Separated into its own, protected, method to allow easier reuse */ - protected void createSession(IoSession session, QueueRegistry queues, ExchangeRegistry exchanges, AMQCodecFactory codec) throws AMQException + protected void createSession(IoSession session, IApplicationRegistry applicationRegistry, AMQCodecFactory codec) throws AMQException { - new AMQMinaProtocolSession(session, queues, exchanges, codec); + new AMQMinaProtocolSession(session, applicationRegistry.getVirtualHostRegistry(), codec); } public void sessionOpened(IoSession protocolSession) throws Exception diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPProtocolProvider.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPProtocolProvider.java index ff1316f704..07c153bfe8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPProtocolProvider.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPProtocolProvider.java @@ -42,8 +42,7 @@ public class AMQPProtocolProvider public AMQPProtocolProvider() { IApplicationRegistry registry = ApplicationRegistry.getInstance(); - _handler = new AMQPFastProtocolHandler(registry.getQueueRegistry(), - registry.getExchangeRegistry()); + _handler = new AMQPFastProtocolHandler(registry); } public AMQPFastProtocolHandler getHandler() diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index a1249723ee..ee7e46eba4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -25,6 +25,7 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.protocol.AMQProtocolWriter; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.AMQException; import javax.security.sasl.SaslServer; @@ -32,6 +33,13 @@ import javax.security.sasl.SaslServer; public interface AMQProtocolSession extends AMQProtocolWriter { + + + public static interface Task + { + public void doTask(AMQProtocolSession session) throws AMQException; + } + /** * Called when a protocol data block is received * @param message the data block that has been received @@ -126,4 +134,13 @@ public interface AMQProtocolSession extends AMQProtocolWriter void setClientProperties(FieldTable clientProperties); Object getClientIdentifier(); + + VirtualHost getVirtualHost(); + + void setVirtualHost(VirtualHost virtualHost); + + void addSessionCloseTask(Task task); + + void removeSessionCloseTask(Task task); + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java index df494915a3..b5fec39626 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java @@ -26,6 +26,7 @@ import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.management.AMQManagedObject; import org.apache.qpid.server.management.MBeanConstructor; import org.apache.qpid.server.management.MBeanDescription; +import org.apache.qpid.server.management.ManagedObject; import javax.management.JMException; import javax.management.MBeanException; @@ -93,6 +94,11 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed return _session.getIOSession().getRemoteAddress().toString(); } + public ManagedObject getParentObject() + { + return _session.getVirtualHost().getManagedObject(); + } + public Long getWrittenBytes() { return _session.getIOSession().getWrittenBytes(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 18b3adc635..709dd28ad5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -30,11 +30,14 @@ import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.virtualhost.VirtualHost; import javax.management.JMException; import java.text.MessageFormat; import java.util.List; +import java.util.ArrayList; import java.util.concurrent.Executor; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicBoolean; @@ -45,6 +48,7 @@ import java.util.concurrent.atomic.AtomicBoolean; public class AMQQueue implements Managable, Comparable { + public static final class ExistingExclusiveSubscription extends AMQException { @@ -95,6 +99,12 @@ public class AMQQueue implements Managable, Comparable private final AtomicBoolean _isExclusive = new AtomicBoolean(); + private final AtomicBoolean _deleted = new AtomicBoolean(false); + + + + private List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>(); + /** * Manages message delivery. @@ -102,11 +112,6 @@ public class AMQQueue implements Managable, Comparable private final DeliveryManager _deliveryMgr; /** - * The queue registry with which this queue is registered. - */ - private final QueueRegistry _queueRegistry; - - /** * Used to track bindings to exchanges so that on deletion they can easily * be cancelled. */ @@ -119,6 +124,9 @@ public class AMQQueue implements Managable, Comparable private final AMQQueueMBean _managedObject; + private final VirtualHost _virtualHost; + + /** * max allowed size(KB) of a single message */ @@ -145,59 +153,26 @@ public class AMQQueue implements Managable, Comparable } public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, - boolean autoDelete, QueueRegistry queueRegistry) - throws AMQException - { - this(name, durable, owner, autoDelete, queueRegistry, - AsyncDeliveryConfig.getAsyncDeliveryExecutor(), new SubscriptionImpl.Factory()); - } - - public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, - boolean autoDelete, QueueRegistry queueRegistry, SubscriptionFactory subscriptionFactory) + boolean autoDelete, VirtualHost virtualHost) throws AMQException { - this(name, durable, owner, autoDelete, queueRegistry, - AsyncDeliveryConfig.getAsyncDeliveryExecutor(), subscriptionFactory); + this(name, durable, owner, autoDelete, virtualHost, + AsyncDeliveryConfig.getAsyncDeliveryExecutor(), new SubscriptionSet(), new SubscriptionImpl.Factory()); } - public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, - boolean autoDelete, QueueRegistry queueRegistry, Executor asyncDelivery, - SubscriptionFactory subscriptionFactory) - throws AMQException - { - - this(name, durable, owner, autoDelete, queueRegistry, asyncDelivery, new SubscriptionSet(), subscriptionFactory); - } - public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, - boolean autoDelete, QueueRegistry queueRegistry, Executor asyncDelivery) - throws AMQException - { - - this(name, durable, owner, autoDelete, queueRegistry, asyncDelivery, new SubscriptionSet(), - new SubscriptionImpl.Factory()); - } - - protected AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, - boolean autoDelete, QueueRegistry queueRegistry, - SubscriptionSet subscribers, SubscriptionFactory subscriptionFactory) - throws AMQException - { - this(name, durable, owner, autoDelete, queueRegistry, - AsyncDeliveryConfig.getAsyncDeliveryExecutor(), subscribers, subscriptionFactory); - } protected AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, - boolean autoDelete, QueueRegistry queueRegistry, + boolean autoDelete, VirtualHost virtualHost, SubscriptionSet subscribers) throws AMQException { - this(name, durable, owner, autoDelete, queueRegistry, + this(name, durable, owner, autoDelete, virtualHost, AsyncDeliveryConfig.getAsyncDeliveryExecutor(), subscribers, new SubscriptionImpl.Factory()); } protected AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, - boolean autoDelete, QueueRegistry queueRegistry, + boolean autoDelete, VirtualHost virtualHost, Executor asyncDelivery, SubscriptionSet subscribers, SubscriptionFactory subscriptionFactory) throws AMQException { @@ -205,18 +180,20 @@ public class AMQQueue implements Managable, Comparable { throw new IllegalArgumentException("Queue name must not be null"); } - if (queueRegistry == null) + if (virtualHost == null) { - throw new IllegalArgumentException("Queue registry must not be null"); + throw new IllegalArgumentException("Virtual Host must not be null"); } _name = name; _durable = durable; _owner = owner; _autoDelete = autoDelete; - _queueRegistry = queueRegistry; + _virtualHost = virtualHost; _asyncDelivery = asyncDelivery; + _managedObject = createMBean(); _managedObject.register(); + _subscribers = subscribers; _subscriptionFactory = subscriptionFactory; _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this); @@ -492,10 +469,18 @@ public class AMQQueue implements Managable, Comparable public void delete() throws AMQException { - _subscribers.queueDeleted(this); - _bindings.deregister(); - _queueRegistry.unregisterQueue(_name); - _managedObject.unregister(); + if(!_deleted.getAndSet(true)) + { + _subscribers.queueDeleted(this); + _bindings.deregister(); + _virtualHost.getQueueRegistry().unregisterQueue(_name); + _managedObject.unregister(); + for(Task task : _deleteTaskList) + { + task.doTask(this); + } + _deleteTaskList.clear(); + } } protected void autodelete() throws AMQException @@ -620,6 +605,24 @@ public class AMQQueue implements Managable, Comparable return _deliveryMgr.performGet(session, channel, acks); } - + public QueueRegistry getQueueRegistry() + { + return _virtualHost.getQueueRegistry(); + } + + public VirtualHost getVirtualHost() + { + return _virtualHost; + } + + public static interface Task + { + public void doTask(AMQQueue queue) throws AMQException; + } + + public void addQueueDeleteTask(Task task) + { + _deleteTaskList.add(task); + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index 012b3600ca..ab67012b19 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -20,7 +20,9 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.management.MBeanDescription; import org.apache.qpid.server.management.AMQManagedObject; import org.apache.qpid.server.management.MBeanConstructor; +import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.Main; import org.apache.qpid.AMQException; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.BasicContentHeaderProperties; @@ -28,11 +30,7 @@ import org.apache.qpid.framing.ContentHeaderBody; import org.apache.mina.common.ByteBuffer; import javax.management.openmbean.*; -import javax.management.JMException; -import javax.management.Notification; -import javax.management.MBeanException; -import javax.management.MBeanNotificationInfo; -import javax.management.OperationsException; +import javax.management.*; import javax.management.monitor.MonitorNotification; import java.util.List; import java.util.ArrayList; @@ -73,6 +71,12 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue _queueName = jmxEncode(new StringBuffer(queue.getName()), 0).toString(); } + + public ManagedObject getParentObject() + { + return _queue.getVirtualHost().getManagedObject(); + } + static { try @@ -373,6 +377,14 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue return _messageList; } +// +// public ObjectName getObjectName() throws MalformedObjectNameException +// { +// String objNameString = super.getObjectName().toString(); +// +// return new ObjectName(objNameString); +// } + /** * returns Notifications sent by this MBean. diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java index 8ab26def74..084612ca41 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; +import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.framing.AMQShortString; import java.util.concurrent.ConcurrentMap; @@ -30,8 +31,16 @@ public class DefaultQueueRegistry implements QueueRegistry { private ConcurrentMap<AMQShortString, AMQQueue> _queueMap = new ConcurrentHashMap<AMQShortString, AMQQueue>(); - public DefaultQueueRegistry() + private final VirtualHost _virtualHost; + + public DefaultQueueRegistry(VirtualHost virtualHost) + { + _virtualHost = virtualHost; + } + + public VirtualHost getVirtualHost() { + return _virtualHost; } public void registerQueue(AMQQueue queue) throws AMQException diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java index bfbaf27c84..c5f235f1b3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java @@ -21,11 +21,14 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; +import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.framing.AMQShortString; public interface QueueRegistry { + VirtualHost getVirtualHost(); + void registerQueue(AMQQueue queue) throws AMQException; void unregisterQueue(AMQShortString name) throws AMQException; diff --git a/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java index 48331843e5..0630d4f39f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.registry; import org.apache.commons.configuration.Configuration; import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.Configurator; +import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.HashMap; import java.util.Iterator; @@ -38,7 +39,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry { private static final Logger _logger = Logger.getLogger(ApplicationRegistry.class); - private static Map _instanceMap = new HashMap(); + private static Map<Integer, IApplicationRegistry> _instanceMap = new HashMap<Integer, IApplicationRegistry>(); private final Map<Class<?>, Object> _configuredObjects = new HashMap<Class<?>, Object>(); @@ -62,20 +63,13 @@ public abstract class ApplicationRegistry implements IApplicationRegistry { synchronized (ApplicationRegistry.class) { - Iterator keyIterator = _instanceMap.keySet().iterator(); + Iterator<IApplicationRegistry> keyIterator = _instanceMap.values().iterator(); while (keyIterator.hasNext()) { - int key = (Integer) keyIterator.next(); - IApplicationRegistry instance = (IApplicationRegistry) _instanceMap.get(key); - - if ((instance != null)) - { - if (instance.getMessageStore() != null) - { - instance.getMessageStore().close(); - } - } + IApplicationRegistry instance = keyIterator.next(); + + instance.close(); } } } @@ -118,7 +112,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry { try { - ((IApplicationRegistry) _instanceMap.get(instanceID)).getMessageStore().close(); + _instanceMap.get(instanceID).close(); } catch (Exception e) { @@ -143,7 +137,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry public static IApplicationRegistry getInstance(int instanceID) { - IApplicationRegistry instance = (IApplicationRegistry) _instanceMap.get(instanceID); + IApplicationRegistry instance = _instanceMap.get(instanceID); if (instance == null) { @@ -168,6 +162,14 @@ public abstract class ApplicationRegistry implements IApplicationRegistry } } + public void close() throws Exception + { + for(VirtualHost virtualHost : getVirtualHostRegistry().getVirtualHosts()) + { + virtualHost.close(); + } + } + public Configuration getConfiguration() { return _configuration; @@ -193,6 +195,8 @@ public abstract class ApplicationRegistry implements IApplicationRegistry return instance; } + + public static void setDefaultApplicationRegistry(String clazz) { _APPLICATION_REGISTRY = clazz; diff --git a/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java index 1eb490d6fb..790421b497 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java @@ -38,22 +38,26 @@ import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.security.auth.AuthenticationManager; import org.apache.qpid.server.security.auth.SASLAuthenticationManager; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.mina.common.ByteBuffer; import java.io.File; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; public class ConfigurationFileApplicationRegistry extends ApplicationRegistry { - private QueueRegistry _queueRegistry; - - private ExchangeRegistry _exchangeRegistry; - - private ExchangeFactory _exchangeFactory; private ManagedObjectRegistry _managedObjectRegistry; private AuthenticationManager _authenticationManager; - private MessageStore _messageStore; + private VirtualHostRegistry _virtualHostRegistry; + + + private final Map<String, VirtualHost> _virtualHosts = new ConcurrentHashMap<String, VirtualHost>(); + public ConfigurationFileApplicationRegistry(File configurationURL) throws ConfigurationException { @@ -91,11 +95,19 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry public void initialise() throws Exception { initialiseManagedObjectRegistry(); - _queueRegistry = new DefaultQueueRegistry(); - _exchangeFactory = new DefaultExchangeFactory(); - _exchangeRegistry = new DefaultExchangeRegistry(_exchangeFactory); + _virtualHostRegistry = new VirtualHostRegistry(); _authenticationManager = new SASLAuthenticationManager(); - initialiseMessageStore(); + + initialiseVirtualHosts(); + } + + private void initialiseVirtualHosts() throws Exception + { + for(String name : getVirtualHostNames()) + { + + _virtualHostRegistry.registerVirtualHost(new VirtualHost(name,getConfiguration().subset("virtualhosts.virtualhost."+name))); + } } private void initialiseManagedObjectRegistry() @@ -111,34 +123,10 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry } } - private void initialiseMessageStore() throws Exception - { - String messageStoreClass = _configuration.getString("store.class"); - Class clazz = Class.forName(messageStoreClass); - Object o = clazz.newInstance(); - if (!(o instanceof MessageStore)) - { - throw new Exception("Message store class must implement " + MessageStore.class + ". Class " + clazz + - " does not."); - } - _messageStore = (MessageStore) o; - _messageStore.configure(getQueueRegistry(), "store", _configuration); - } - - public QueueRegistry getQueueRegistry() + public VirtualHostRegistry getVirtualHostRegistry() { - return _queueRegistry; - } - - public ExchangeRegistry getExchangeRegistry() - { - return _exchangeRegistry; - } - - public ExchangeFactory getExchangeFactory() - { - return _exchangeFactory; + return _virtualHostRegistry; } public ManagedObjectRegistry getManagedObjectRegistry() @@ -151,8 +139,8 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry return _authenticationManager; } - public MessageStore getMessageStore() + public Collection<String> getVirtualHostNames() { - return _messageStore; - } + return getConfiguration().getList("virtualhosts.virtualhost.name"); + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java index cd664f9a4b..703aed69d2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java @@ -26,8 +26,12 @@ import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.management.ManagedObjectRegistry; import org.apache.qpid.server.security.auth.AuthenticationManager; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; import org.apache.commons.configuration.Configuration; +import java.util.Collection; + public interface IApplicationRegistry { /** @@ -35,7 +39,9 @@ public interface IApplicationRegistry * that need access to the application registry itself for initialisation are able to use it. Attempting to * initialise in the constructor will lead to failures since the registry reference will not have been set. */ - void initialise() throws Exception; + void initialise() throws Exception; + + void close() throws Exception; /** * This gets access to a "configured object". A configured object has fields populated from a the configuration @@ -54,15 +60,11 @@ public interface IApplicationRegistry */ Configuration getConfiguration(); - QueueRegistry getQueueRegistry(); - - ExchangeRegistry getExchangeRegistry(); - - ExchangeFactory getExchangeFactory(); - ManagedObjectRegistry getManagedObjectRegistry(); AuthenticationManager getAuthenticationManager(); - MessageStore getMessageStore(); + Collection<String> getVirtualHostNames(); + + VirtualHostRegistry getVirtualHostRegistry(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java index 81ce704026..7b8ba1d9cc 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java @@ -31,6 +31,7 @@ import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; import org.apache.log4j.Logger; import java.util.HashMap; @@ -46,8 +47,8 @@ import java.util.concurrent.CopyOnWriteArraySet; public class AMQStateManager implements AMQMethodListener { private static final Logger _logger = Logger.getLogger(AMQStateManager.class); - private final QueueRegistry _queueRegistry; - private final ExchangeRegistry _exchangeRegistry; + + private final VirtualHostRegistry _virtualHostRegistry; private final AMQProtocolSession _protocolSession; /** * The current state @@ -63,15 +64,15 @@ public class AMQStateManager implements AMQMethodListener private CopyOnWriteArraySet<StateListener> _stateListeners = new CopyOnWriteArraySet<StateListener>(); - public AMQStateManager(QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession) + + public AMQStateManager(VirtualHostRegistry virtualHostRegistry, AMQProtocolSession protocolSession) { - this(AMQState.CONNECTION_NOT_STARTED, true, queueRegistry, exchangeRegistry, protocolSession); + this(AMQState.CONNECTION_NOT_STARTED, true, virtualHostRegistry, protocolSession); } - protected AMQStateManager(AMQState initial, boolean register, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession) + protected AMQStateManager(AMQState initial, boolean register, VirtualHostRegistry virtualHostRegistry, AMQProtocolSession protocolSession) { - _queueRegistry = queueRegistry; - _exchangeRegistry = exchangeRegistry; + _virtualHostRegistry = virtualHostRegistry; _protocolSession = protocolSession; _currentState = initial; if (register) @@ -176,7 +177,7 @@ public class AMQStateManager implements AMQMethodListener checkChannel(evt, _protocolSession); - handler.methodReceived(this, _queueRegistry, _exchangeRegistry, _protocolSession, evt); + handler.methodReceived(this, evt); return true; } return false; @@ -241,4 +242,14 @@ public class AMQStateManager implements AMQMethodListener { _stateListeners.remove(listener); } + + public VirtualHostRegistry getVirtualHostRegistry() + { + return _virtualHostRegistry; + } + + public AMQProtocolSession getProtocolSession() + { + return _protocolSession; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java b/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java index 56323258b7..99d5d7fe88 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java +++ b/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java @@ -25,6 +25,7 @@ import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; import org.apache.qpid.framing.AMQMethodBody; /** @@ -34,7 +35,5 @@ import org.apache.qpid.framing.AMQMethodBody; */ public interface StateAwareMethodListener <B extends AMQMethodBody> { - void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, - AMQMethodEvent<B> evt) throws AMQException; + void methodReceived(AMQStateManager stateManager, AMQMethodEvent<B> evt) throws AMQException; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index 98a4c3f6e7..eaaffa2dce 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -28,6 +28,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MessageMetaData; import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.ArrayList; import java.util.List; @@ -67,7 +68,7 @@ public class MemoryMessageStore implements MessageStore _contentBodyMap = new ConcurrentHashMap<Long, List<ContentBody>>(hashtableCapacity); } - public void configure(QueueRegistry queueRegistry, String base, Configuration config) throws Exception + public void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception { configure(base, config); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java index c9c7045402..8daad0e5e5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java @@ -27,6 +27,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MessageMetaData; import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.List; @@ -35,13 +36,13 @@ public interface MessageStore /** * Called after instantiation in order to configure the message store. A particular implementation can define * whatever parameters it wants. - * @param queueRegistry the registry of queues to be used by this store + * @param virtualHost the virtual host using by this store * @param base the base element identifier from which all configuration items are relative. For example, if the base * element is "store", the all elements used by concrete classes will be "store.foo" etc. * @param config the apache commons configuration object * @throws Exception if an error occurs that means the store is unable to configure itself */ - void configure(QueueRegistry queueRegistry, String base, Configuration config) throws Exception; + void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception; /** * Called to close and cleanup any resources used by the message store. diff --git a/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java index 2e77f33363..e9a3a3d048 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java @@ -33,24 +33,23 @@ import org.apache.qpid.server.security.auth.AuthenticationManager; import org.apache.qpid.server.security.auth.NullAuthenticationManager; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MemoryMessageStore; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.MapConfiguration; import java.util.HashMap; +import java.util.Collection; +import java.util.Collections; +import java.util.Arrays; public class NullApplicationRegistry extends ApplicationRegistry { - private QueueRegistry _queueRegistry; - - private ExchangeRegistry _exchangeRegistry; - - private ExchangeFactory _exchangeFactory; - private ManagedObjectRegistry _managedObjectRegistry; private AuthenticationManager _authenticationManager; - private MessageStore _messageStore; + private VirtualHostRegistry _virtualHostRegistry; public NullApplicationRegistry() @@ -60,15 +59,16 @@ public class NullApplicationRegistry extends ApplicationRegistry public void initialise() throws Exception { + _configuration.addProperty("store.class","org.apache.qpid.server.store.MemoryMessageStore"); + _managedObjectRegistry = new NoopManagedObjectRegistry(); - _queueRegistry = new DefaultQueueRegistry(); - _exchangeFactory = new DefaultExchangeFactory(); - _exchangeRegistry = new DefaultExchangeRegistry(_exchangeFactory); + _virtualHostRegistry = new VirtualHostRegistry(); + VirtualHost dummyHost = new VirtualHost("test",getConfiguration()); + _virtualHostRegistry.registerVirtualHost(dummyHost); _authenticationManager = new NullAuthenticationManager(); - _messageStore = new MemoryMessageStore(); - ((MemoryMessageStore)_messageStore).configure(); _configuration.addProperty("heartbeat.delay", 10 * 60); // 10 minutes + } public Configuration getConfiguration() @@ -76,20 +76,6 @@ public class NullApplicationRegistry extends ApplicationRegistry return _configuration; } - public QueueRegistry getQueueRegistry() - { - return _queueRegistry; - } - - public ExchangeRegistry getExchangeRegistry() - { - return _exchangeRegistry; - } - - public ExchangeFactory getExchangeFactory() - { - return _exchangeFactory; - } public ManagedObjectRegistry getManagedObjectRegistry() { @@ -101,9 +87,15 @@ public class NullApplicationRegistry extends ApplicationRegistry return _authenticationManager; } - public MessageStore getMessageStore() + public Collection<String> getVirtualHostNames() + { + String[] hosts = {"test"}; + return Arrays.asList( hosts ); + } + + public VirtualHostRegistry getVirtualHostRegistry() { - return _messageStore; + return _virtualHostRegistry; } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ManagedVirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ManagedVirtualHost.java new file mode 100644 index 0000000000..2c888caac1 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ManagedVirtualHost.java @@ -0,0 +1,51 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import org.apache.qpid.server.management.MBeanAttribute;
+import org.apache.qpid.server.management.MBeanOperation;
+import org.apache.qpid.server.management.MBeanOperationParameter;
+import org.apache.qpid.server.queue.ManagedQueue;
+import org.apache.qpid.server.exchange.ManagedExchange;
+
+import javax.management.openmbean.TabularData;
+import javax.management.JMException;
+import javax.management.MBeanOperationInfo;
+import java.io.IOException;
+
+/**
+ * The management interface exposed to allow management of an Exchange.
+ * @version 0.1
+ */
+public interface ManagedVirtualHost
+{
+ static final String TYPE = "VirtualHost";
+
+ /**
+ * Returns the name of the managed virtualHost.
+ * @return the name of the exchange.
+ * @throws java.io.IOException
+ */
+ @MBeanAttribute(name="Name", description= TYPE + " Name")
+ String getName() throws IOException;
+
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java new file mode 100644 index 0000000000..15bad19e58 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -0,0 +1,193 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.DefaultQueueRegistry;
+import org.apache.qpid.server.exchange.*;
+import org.apache.qpid.server.management.*;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.configuration.Configurator;
+import org.apache.qpid.server.AMQBrokerManagerMBean;
+import org.apache.commons.configuration.Configuration;
+import org.apache.log4j.Logger;
+
+import javax.management.NotCompliantMBeanException;
+
+public class VirtualHost
+{
+ private static final Logger _logger = Logger.getLogger(VirtualHost.class);
+
+
+ private final String _name;
+
+ private QueueRegistry _queueRegistry;
+
+ private ExchangeRegistry _exchangeRegistry;
+
+ private ExchangeFactory _exchangeFactory;
+
+ private MessageStore _messageStore;
+
+ protected VirtualHostMBean _virtualHostMBean;
+
+ private AMQBrokerManagerMBean _brokerMBean;
+
+
+ /**
+ * Abstract MBean class. This has some of the methods implemented from
+ * management intrerface for exchanges. Any implementaion of an
+ * Exchange MBean should extend this class.
+ */
+ public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtualHost
+ {
+ public VirtualHostMBean() throws NotCompliantMBeanException
+ {
+ super(ManagedVirtualHost.class, "VirtualHost");
+ }
+
+
+ public String getObjectInstanceName()
+ {
+ return _name.toString();
+ }
+
+ public String getName()
+ {
+ return _name.toString();
+ }
+
+ public VirtualHost getVirtualHost()
+ {
+ return VirtualHost.this;
+ }
+
+
+ } // End of MBean class
+
+
+
+ public VirtualHost(String name, Configuration hostConfig) throws Exception
+ {
+ _name = name;
+
+ _virtualHostMBean = new VirtualHostMBean();
+ _virtualHostMBean.register();
+
+ _queueRegistry = new DefaultQueueRegistry(this);
+ _exchangeFactory = new DefaultExchangeFactory(this);
+ _exchangeRegistry = new DefaultExchangeRegistry(_exchangeFactory);
+
+ initialiseMessageStore(hostConfig);
+
+ _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
+ _brokerMBean.register();
+
+ }
+
+ private void initialiseMessageStore(Configuration config) throws Exception
+ {
+ String messageStoreClass = config.getString("store.class");
+
+ Class clazz = Class.forName(messageStoreClass);
+ Object o = clazz.newInstance();
+
+ if (!(o instanceof MessageStore))
+ {
+ throw new ClassCastException("Message store class must implement " + MessageStore.class + ". Class " + clazz +
+ " does not.");
+ }
+ _messageStore = (MessageStore) o;
+ _messageStore.configure(this, "store", config);
+ }
+
+
+
+
+ public <T> T getConfiguredObject(Class<T> instanceType, Configuration config)
+ {
+ T instance;
+ try
+ {
+ instance = instanceType.newInstance();
+ }
+ catch (Exception e)
+ {
+ _logger.error("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor");
+ throw new IllegalArgumentException("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor");
+ }
+ Configurator.configure(instance);
+
+ return instance;
+ }
+
+
+ public String getName()
+ {
+ return _name;
+ }
+
+ public QueueRegistry getQueueRegistry()
+ {
+ return _queueRegistry;
+ }
+
+ public ExchangeRegistry getExchangeRegistry()
+ {
+ return _exchangeRegistry;
+ }
+
+ public ExchangeFactory getExchangeFactory()
+ {
+ return _exchangeFactory;
+ }
+
+ public ApplicationRegistry getApplicationRegistry()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public MessageStore getMessageStore()
+ {
+ return _messageStore;
+ }
+
+ public void close() throws Exception
+ {
+ if(_messageStore != null)
+ {
+ _messageStore.close();
+ }
+ }
+
+ public ManagedObject getBrokerMBean()
+ {
+ return _brokerMBean;
+ }
+
+
+
+ public ManagedObject getManagedObject()
+ {
+ return _virtualHostMBean;
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java new file mode 100644 index 0000000000..25f67c1cf3 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java @@ -0,0 +1,52 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import java.util.Map;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.ArrayList;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+public class VirtualHostRegistry
+{
+ private final Map<String, VirtualHost> _registry = new ConcurrentHashMap<String,VirtualHost>();
+
+ public synchronized void registerVirtualHost(VirtualHost host) throws Exception
+ {
+ if(_registry.containsKey(host.getName()))
+ {
+ throw new Exception("Virtual Host with name " + host.getName() + " already registered.");
+ }
+ _registry.put(host.getName(),host);
+ }
+
+ public VirtualHost getVirtualHost(String name)
+ {
+ return _registry.get(name);
+ }
+
+ public Collection<VirtualHost> getVirtualHosts()
+ {
+ return new ArrayList<VirtualHost>(_registry.values());
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index a155117a7f..c82afd3906 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -156,7 +156,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { this(new AMQConnectionURL(ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@" + - (clientName == null ? "" : clientName) + + (clientName == null ? "" : clientName) + "/" + virtualHost + "?brokerlist='" + AMQBrokerDetails.checkTransport(broker) + "'")); } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java index c134c2093b..b32a0ffdc4 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java @@ -114,6 +114,10 @@ public class AMQConnectionURL implements ConnectionURL if (virtualHost != null && (!virtualHost.equals(""))) { + if(virtualHost.startsWith("/")) + { + virtualHost = virtualHost.substring(1); + } setVirtualHost(virtualHost); } else diff --git a/java/client/src/old_test/java/org/apache/qpid/test/unit/client/connection/TestManyConnections.java b/java/client/src/old_test/java/org/apache/qpid/test/unit/client/connection/TestManyConnections.java index d89bc4a771..f59b36166a 100644 --- a/java/client/src/old_test/java/org/apache/qpid/test/unit/client/connection/TestManyConnections.java +++ b/java/client/src/old_test/java/org/apache/qpid/test/unit/client/connection/TestManyConnections.java @@ -46,7 +46,7 @@ public class TestManyConnections extends TestCase long startTime = System.currentTimeMillis(); for (int i = 0; i < count; i++) { - createConnection(i, "vm://:1", "myClient" + i, "guest", "guest", "/test"); + createConnection(i, "vm://:1", "myClient" + i, "guest", "guest", "test"); } long endTime = System.currentTimeMillis(); _log.info("Time to create " + count + " connections: " + (endTime - startTime) + diff --git a/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java index 39a2e9c627..6b03dd32e8 100644 --- a/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java +++ b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java @@ -76,7 +76,7 @@ public class MessageListenerMultiConsumerTest extends TestCase Hashtable<String, String> env = new Hashtable<String, String>(); - env.put("connectionfactory.connection", "amqp://client:client@MLT_ID/tests?brokerlist='vm://:1'"); + env.put("connectionfactory.connection", "amqp://client:client@MLT_ID/test?brokerlist='vm://:1'"); env.put("queue.queue", "direct://amq.direct//MessageListenerTest"); _context = factory.getInitialContext(env); diff --git a/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java b/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java index b99593aaa5..01c3d30314 100644 --- a/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java +++ b/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java @@ -70,7 +70,7 @@ public class MessageListenerTest extends TestCase implements MessageListener Hashtable<String, String> env = new Hashtable<String, String>(); - env.put("connectionfactory.connection", "amqp://client:client@MLT_ID/tests?brokerlist='vm://:1'"); + env.put("connectionfactory.connection", "amqp://client:client@MLT_ID/test?brokerlist='vm://:1'"); env.put("queue.queue", "direct://amq.direct//MessageListenerTest"); _context = factory.getInitialContext(env); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java index 234b4c8a67..2d69b4fb82 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java @@ -49,7 +49,7 @@ public class RecoverTest extends TestCase public void testRecoverResendsMsgs() throws Exception { - Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); + Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test"); Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); Queue queue = new AMQQueue(new AMQShortString("someQ"), new AMQShortString("someQ"), false, true); @@ -57,7 +57,7 @@ public class RecoverTest extends TestCase //force synch to ensure the consumer has resulted in a bound queue ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); - Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test"); + Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test"); Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); MessageProducer producer = producerSession.createProducer(queue); @@ -106,7 +106,7 @@ public class RecoverTest extends TestCase public void testRecoverResendsMsgsAckOnEarlier() throws Exception { - Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); + Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test"); Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); Queue queue = new AMQQueue(new AMQShortString("someQ"), new AMQShortString("someQ"), false, true); @@ -114,7 +114,7 @@ public class RecoverTest extends TestCase //force synch to ensure the consumer has resulted in a bound queue ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); - Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test"); + Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test"); Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); MessageProducer producer = producerSession.createProducer(queue); @@ -170,7 +170,7 @@ public class RecoverTest extends TestCase public void testAcknowledgePerConsumer() throws Exception { - Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); + Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test"); Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); Queue queue = new AMQQueue(new AMQShortString("Q1"), new AMQShortString("Q1"), false, true); @@ -178,7 +178,7 @@ public class RecoverTest extends TestCase MessageConsumer consumer = consumerSession.createConsumer(queue); MessageConsumer consumer2 = consumerSession.createConsumer(queue2); - Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test"); + Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test"); Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); MessageProducer producer = producerSession.createProducer(queue); MessageProducer producer2 = producerSession.createProducer(queue2); @@ -209,7 +209,7 @@ public class RecoverTest extends TestCase public void testRecoverInAutoAckListener() throws Exception { - Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); + Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test"); final Session consumerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = new AMQQueue(new AMQShortString("Q1"), new AMQShortString("Q1"), false, true); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java index 59be38f0dd..cf5b5c76e5 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java @@ -48,7 +48,7 @@ public class BytesMessageTest extends TestCase implements MessageListener protected void setUp() throws Exception { super.setUp(); - init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path")); + init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test")); } protected void tearDown() throws Exception diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java index 5af55d6625..d1e90e7bcd 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java @@ -54,7 +54,7 @@ public class FieldTableMessageTest extends TestCase implements MessageListener protected void setUp() throws Exception { super.setUp(); - init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path")); + init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test")); } protected void tearDown() throws Exception diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java index bc2def1c64..0d283aa0d9 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java @@ -56,7 +56,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { TransportConnection.createVMBroker(1); - init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path")); + init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test")); } catch (Exception e) { diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java index 1e9de221d4..66d82a991e 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java @@ -41,7 +41,7 @@ public class MultipleConnectionTest extends TestCase Receiver(String broker, AMQDestination dest, int sessions) throws Exception { - this(new AMQConnection(broker, "guest", "guest", randomize("Client"), "/test_path"), dest, sessions); + this(new AMQConnection(broker, "guest", "guest", randomize("Client"), "test"), dest, sessions); } Receiver(AMQConnection connection, AMQDestination dest, int sessions) throws Exception @@ -72,7 +72,7 @@ public class MultipleConnectionTest extends TestCase Publisher(String broker, AMQDestination dest) throws Exception { - this(new AMQConnection(broker, "guest", "guest", randomize("Client"), "/test_path"), dest); + this(new AMQConnection(broker, "guest", "guest", randomize("Client"), "test"), dest); } Publisher(AMQConnection connection, AMQDestination dest) throws Exception diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java index 3f726ae5ab..dc1aadaa6c 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java @@ -50,7 +50,7 @@ public class ObjectMessageTest extends TestCase implements MessageListener TransportConnection.createVMBroker(1); try { - init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path")); + init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test")); } catch (Exception e) { diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java index 7423a3d8f0..d0126e1917 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java @@ -53,7 +53,7 @@ public class PropertyValueTest extends TestCase implements MessageListener super.setUp(); try { - init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path")); + init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test")); } catch (Exception e) { diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java index f4814795c4..38b33f4b18 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java @@ -51,11 +51,11 @@ public class PubSubTwoConnectionTest extends TestCase public void testTwoConnections() throws Exception { Topic topic = new AMQTopic("MyTopic"); - Connection con1 = new AMQConnection("vm://:1", "guest", "guest", "Client1", "/test_path"); + Connection con1 = new AMQConnection("vm://:1", "guest", "guest", "Client1", "test"); Session session1 = con1.createSession(false, AMQSession.NO_ACKNOWLEDGE); MessageProducer producer = session1.createProducer(topic); - Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "Client2", "/test_path"); + Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "Client2", "test"); Session session2 = con2.createSession(false, AMQSession.NO_ACKNOWLEDGE); MessageConsumer consumer = session2.createConsumer(topic); con2.start(); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/ReceiveTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/ReceiveTest.java index 302551b05c..1db62cffa9 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/ReceiveTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/ReceiveTest.java @@ -48,7 +48,7 @@ public class ReceiveTest extends TestCase { createVMBroker(); String broker = _connectionString; - init(new AMQConnection(broker, "guest", "guest", "ReceiveTestClient", "/test_path")); + init(new AMQConnection(broker, "guest", "guest", "ReceiveTestClient", "test")); } } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java index 27a2ccb32e..fe15e151a3 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java @@ -51,7 +51,7 @@ public class SelectorTest extends TestCase implements MessageListener { super.setUp(); TransportConnection.createVMBroker(1); - init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path")); + init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test")); } protected void tearDown() throws Exception diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/SessionStartTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/SessionStartTest.java index 726c7e39d7..cce02accd8 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/SessionStartTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/SessionStartTest.java @@ -43,7 +43,7 @@ public class SessionStartTest extends TestCase implements MessageListener protected void setUp() throws Exception { super.setUp(); - init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path")); + init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test")); } protected void tearDown() throws Exception diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java index 81481bc94d..b50cd39780 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java @@ -52,7 +52,7 @@ public class TextMessageTest extends TestCase implements MessageListener super.setUp(); try { - init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path")); + init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test")); } catch (Exception e) { diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java index c4b60be1d1..db4e18a4a1 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java @@ -42,7 +42,7 @@ public class AMQConnectionTest extends TestCase { super.setUp(); TransportConnection.createVMBroker(1); - _connection = new AMQConnection("vm://:1", "guest", "guest", "fred", "/test"); + _connection = new AMQConnection("vm://:1", "guest", "guest", "fred", "test"); _topic = new AMQTopic("mytopic"); _queue = new AMQQueue("myqueue"); } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQSessionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQSessionTest.java index 67c4f1dd6b..b01a129bf2 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQSessionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQSessionTest.java @@ -46,7 +46,7 @@ public class AMQSessionTest extends TestCase protected void setUp() throws Exception { super.setUp(); - _connection = new AMQConnection("vm://:1", "guest", "guest", "fred", "/test"); + _connection = new AMQConnection("vm://:1", "guest", "guest", "fred", "test"); _topic = new AMQTopic("mytopic"); _queue = new AMQQueue("myqueue"); _session = (AMQSession) _connection.createSession(false, AMQSession.NO_ACKNOWLEDGE); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java index d84d66e26d..05d83be47f 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java @@ -65,7 +65,7 @@ public class ChannelCloseOkTest extends TestCase super.setUp(); TransportConnection.createVMBroker(1); - _connection = new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path"); + _connection = new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test"); _destination1 = new AMQQueue("q1", true); _destination2 = new AMQQueue("q2", true); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java index 0b3ed931f8..7a665daeb3 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java @@ -49,7 +49,7 @@ public class CloseWithBlockingReceiveTest extends TestCase public void testReceiveReturnsNull() throws Exception { final Connection connection = new AMQConnection("vm://:1", "guest", "guest", - "fred", "/test"); + "fred", "test"); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = session.createConsumer(new AMQTopic("banana")); connection.start(); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java index 0bfe8dbddf..8441799990 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java @@ -55,7 +55,7 @@ public class ConnectionTest extends TestCase { try { - new AMQConnection(_broker, "guest", "guest", "fred", "/test"); + new AMQConnection(_broker, "guest", "guest", "fred", "test"); } catch (Exception e) { @@ -115,7 +115,7 @@ public class ConnectionTest extends TestCase public void testClientIdCannotBeChanged() throws Exception { Connection connection = new AMQConnection(_broker, "guest", "guest", - "fred", "/test"); + "fred", "test"); try { connection.setClientID("someClientId"); @@ -130,7 +130,7 @@ public class ConnectionTest extends TestCase public void testClientIdIsPopulatedAutomatically() throws Exception { Connection connection = new AMQConnection(_broker, "guest", "guest", - null, "/test"); + null, "test"); assertNotNull(connection.getClientID()); } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java index 8e67f97787..c6dee1d9bf 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java @@ -33,14 +33,14 @@ public class ConnectionURLTest extends TestCase public void testFailoverURL() throws URLSyntaxException { - String url = "amqp://ritchiem:bob@/temp?brokerlist='tcp://localhost:5672;tcp://fancyserver:3000/',failover='roundrobin'"; + String url = "amqp://ritchiem:bob@/test?brokerlist='tcp://localhost:5672;tcp://fancyserver:3000/',failover='roundrobin'"; ConnectionURL connectionurl = new AMQConnectionURL(url); assertTrue(connectionurl.getFailoverMethod().equals("roundrobin")); assertTrue(connectionurl.getUsername().equals("ritchiem")); assertTrue(connectionurl.getPassword().equals("bob")); - assertTrue(connectionurl.getVirtualHost().equals("/temp")); + assertTrue(connectionurl.getVirtualHost().equals("test")); assertTrue(connectionurl.getBrokerCount() == 2); @@ -60,14 +60,14 @@ public class ConnectionURLTest extends TestCase public void testSingleTransportUsernamePasswordURL() throws URLSyntaxException { - String url = "amqp://ritchiem:bob@/temp?brokerlist='tcp://localhost:5672'"; + String url = "amqp://ritchiem:bob@/test?brokerlist='tcp://localhost:5672'"; ConnectionURL connectionurl = new AMQConnectionURL(url); assertTrue(connectionurl.getFailoverMethod() == null); assertTrue(connectionurl.getUsername().equals("ritchiem")); assertTrue(connectionurl.getPassword().equals("bob")); - assertTrue(connectionurl.getVirtualHost().equals("/temp")); + assertTrue(connectionurl.getVirtualHost().equals("test")); assertTrue(connectionurl.getBrokerCount() == 1); @@ -80,14 +80,14 @@ public class ConnectionURLTest extends TestCase public void testSingleTransportUsernameBlankPasswordURL() throws URLSyntaxException { - String url = "amqp://ritchiem:@/temp?brokerlist='tcp://localhost:5672'"; + String url = "amqp://ritchiem:@/test?brokerlist='tcp://localhost:5672'"; ConnectionURL connectionurl = new AMQConnectionURL(url); assertTrue(connectionurl.getFailoverMethod() == null); assertTrue(connectionurl.getUsername().equals("ritchiem")); assertTrue(connectionurl.getPassword().equals("")); - assertTrue(connectionurl.getVirtualHost().equals("/temp")); + assertTrue(connectionurl.getVirtualHost().equals("test")); assertTrue(connectionurl.getBrokerCount() == 1); @@ -100,7 +100,7 @@ public class ConnectionURLTest extends TestCase public void testFailedURLNullPassword() { - String url = "amqp://ritchiem@/temp?brokerlist='tcp://localhost:5672'"; + String url = "amqp://ritchiem@/test?brokerlist='tcp://localhost:5672'"; try { @@ -125,7 +125,7 @@ public class ConnectionURLTest extends TestCase assertTrue(connectionurl.getFailoverMethod() == null); assertTrue(connectionurl.getUsername().equals("guest")); assertTrue(connectionurl.getPassword().equals("guest")); - assertTrue(connectionurl.getVirtualHost().equals("/test")); + assertTrue(connectionurl.getVirtualHost().equals("test")); assertTrue(connectionurl.getBrokerCount() == 1); @@ -140,7 +140,7 @@ public class ConnectionURLTest extends TestCase public void testSingleTransportWithClientURLURL() throws URLSyntaxException { - String url = "amqp://guest:guest@clientname/temp?brokerlist='tcp://localhost:5672'"; + String url = "amqp://guest:guest@clientname/test?brokerlist='tcp://localhost:5672'"; ConnectionURL connectionurl = new AMQConnectionURL(url); @@ -148,7 +148,7 @@ public class ConnectionURLTest extends TestCase assertTrue(connectionurl.getFailoverMethod() == null); assertTrue(connectionurl.getUsername().equals("guest")); assertTrue(connectionurl.getPassword().equals("guest")); - assertTrue(connectionurl.getVirtualHost().equals("/temp")); + assertTrue(connectionurl.getVirtualHost().equals("test")); assertTrue(connectionurl.getClientName().equals("clientname")); @@ -164,14 +164,14 @@ public class ConnectionURLTest extends TestCase public void testSingleTransport1OptionURL() throws URLSyntaxException { - String url = "amqp://guest:guest@/temp?brokerlist='tcp://localhost:5672',routingkey='jim'"; + String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672',routingkey='jim'"; ConnectionURL connectionurl = new AMQConnectionURL(url); assertTrue(connectionurl.getFailoverMethod() == null); assertTrue(connectionurl.getUsername().equals("guest")); assertTrue(connectionurl.getPassword().equals("guest")); - assertTrue(connectionurl.getVirtualHost().equals("/temp")); + assertTrue(connectionurl.getVirtualHost().equals("test")); assertTrue(connectionurl.getBrokerCount() == 1); @@ -187,14 +187,14 @@ public class ConnectionURLTest extends TestCase public void testSingleTransportDefaultedBroker() throws URLSyntaxException { - String url = "amqp://guest:guest@/temp?brokerlist='localhost'"; + String url = "amqp://guest:guest@/test?brokerlist='localhost'"; ConnectionURL connectionurl = new AMQConnectionURL(url); assertTrue(connectionurl.getFailoverMethod() == null); assertTrue(connectionurl.getUsername().equals("guest")); assertTrue(connectionurl.getPassword().equals("guest")); - assertTrue(connectionurl.getVirtualHost().equals("/temp")); + assertTrue(connectionurl.getVirtualHost().equals("test")); assertTrue(connectionurl.getBrokerCount() == 1); @@ -209,14 +209,14 @@ public class ConnectionURLTest extends TestCase public void testSingleTransportDefaultedBrokerWithPort() throws URLSyntaxException { - String url = "amqp://guest:guest@/temp?brokerlist='localhost:1234'"; + String url = "amqp://guest:guest@/test?brokerlist='localhost:1234'"; ConnectionURL connectionurl = new AMQConnectionURL(url); assertTrue(connectionurl.getFailoverMethod() == null); assertTrue(connectionurl.getUsername().equals("guest")); assertTrue(connectionurl.getPassword().equals("guest")); - assertTrue(connectionurl.getVirtualHost().equals("/temp")); + assertTrue(connectionurl.getVirtualHost().equals("test")); assertTrue(connectionurl.getBrokerCount() == 1); @@ -231,14 +231,14 @@ public class ConnectionURLTest extends TestCase public void testSingleTransportDefaultedBrokerWithIP() throws URLSyntaxException { - String url = "amqp://guest:guest@/temp?brokerlist='127.0.0.1'"; + String url = "amqp://guest:guest@/test?brokerlist='127.0.0.1'"; ConnectionURL connectionurl = new AMQConnectionURL(url); assertTrue(connectionurl.getFailoverMethod() == null); assertTrue(connectionurl.getUsername().equals("guest")); assertTrue(connectionurl.getPassword().equals("guest")); - assertTrue(connectionurl.getVirtualHost().equals("/temp")); + assertTrue(connectionurl.getVirtualHost().equals("test")); assertTrue(connectionurl.getBrokerCount() == 1); @@ -253,7 +253,7 @@ public class ConnectionURLTest extends TestCase public void testSingleTransportDefaultedBrokerWithIPandPort() throws URLSyntaxException { - String url = "amqp://guest:guest@/temp?brokerlist='127.0.0.1:1234'"; + String url = "amqp://guest:guest@/test?brokerlist='127.0.0.1:1234'"; // ConnectionURL connectionurl = new AMQConnectionURL(url); // @@ -276,14 +276,14 @@ public class ConnectionURLTest extends TestCase public void testSingleTransportMultiOptionURL() throws URLSyntaxException { - String url = "amqp://guest:guest@/temp?brokerlist='tcp://localhost:5672',routingkey='jim',timeout='200',immediatedelivery='true'"; + String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672',routingkey='jim',timeout='200',immediatedelivery='true'"; ConnectionURL connectionurl = new AMQConnectionURL(url); assertTrue(connectionurl.getFailoverMethod() == null); assertTrue(connectionurl.getUsername().equals("guest")); assertTrue(connectionurl.getPassword().equals("guest")); - assertTrue(connectionurl.getVirtualHost().equals("/temp")); + assertTrue(connectionurl.getVirtualHost().equals("test")); assertTrue(connectionurl.getBrokerCount() == 1); @@ -301,14 +301,14 @@ public class ConnectionURLTest extends TestCase public void testSinglevmURL() throws URLSyntaxException { - String url = "amqp://guest:guest@/messages?brokerlist='vm://:2'"; + String url = "amqp://guest:guest@/test?brokerlist='vm://:2'"; ConnectionURL connectionurl = new AMQConnectionURL(url); assertTrue(connectionurl.getFailoverMethod() == null); assertTrue(connectionurl.getUsername().equals("guest")); assertTrue(connectionurl.getPassword().equals("guest")); - assertTrue(connectionurl.getVirtualHost().equals("/messages")); + assertTrue(connectionurl.getVirtualHost().equals("test")); assertTrue(connectionurl.getBrokerCount() == 1); @@ -322,14 +322,14 @@ public class ConnectionURLTest extends TestCase public void testFailoverVMURL() throws URLSyntaxException { - String url = "amqp://ritchiem:bob@/temp?brokerlist='vm://:2;vm://:3',failover='roundrobin'"; + String url = "amqp://ritchiem:bob@/test?brokerlist='vm://:2;vm://:3',failover='roundrobin'"; ConnectionURL connectionurl = new AMQConnectionURL(url); assertTrue(connectionurl.getFailoverMethod().equals("roundrobin")); assertTrue(connectionurl.getUsername().equals("ritchiem")); assertTrue(connectionurl.getPassword().equals("bob")); - assertTrue(connectionurl.getVirtualHost().equals("/temp")); + assertTrue(connectionurl.getVirtualHost().equals("test")); assertTrue(connectionurl.getBrokerCount() == 2); @@ -369,7 +369,7 @@ public class ConnectionURLTest extends TestCase assertTrue(connectionurl.getUsername().equals("user")); assertTrue(connectionurl.getPassword().equals("")); - assertTrue(connectionurl.getVirtualHost().equals("/test")); + assertTrue(connectionurl.getVirtualHost().equals("test")); assertTrue(connectionurl.getBrokerCount() == 1); } @@ -428,7 +428,7 @@ public class ConnectionURLTest extends TestCase String url = "amqp://guest:guest@/t.-_+!=:?brokerlist='tcp://localhost:5672'"; AMQConnectionURL connection = new AMQConnectionURL(url); - assertTrue(connection.getVirtualHost().equals("/t.-_+!=:")); + assertTrue(connection.getVirtualHost().equals("t.-_+!=:")); } public void testCheckDefaultPort() throws URLSyntaxException diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java index 6c2c684362..f12400c7b1 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java @@ -87,7 +87,7 @@ public class Client implements MessageListener static AMQConnection connect(String broker) throws Exception { - return new AMQConnection(broker, "guest", "guest", "Client" + System.currentTimeMillis(), "/test_path"); + return new AMQConnection(broker, "guest", "guest", "Client" + System.currentTimeMillis(), "test"); } public static void main(String[] argv) throws Exception diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Service.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Service.java index a1c64e2246..58f9c6fc19 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Service.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Service.java @@ -73,7 +73,7 @@ public class Service implements MessageListener static AMQConnection connect(String broker) throws Exception { - return new AMQConnection(broker, "guest", "guest", "Client" + System.currentTimeMillis(), "/test_path"); + return new AMQConnection(broker, "guest", "guest", "Client" + System.currentTimeMillis(), "test"); } // public static void main(String[] argv) throws Exception diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java index bbd1870168..0710605db9 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java @@ -54,7 +54,7 @@ public class ObjectMessageTest extends TestCase implements MessageListener protected void setUp() throws Exception { super.setUp(); - connection = new AMQConnection(_broker, "guest", "guest", randomize("Client"), "/test_path"); + connection = new AMQConnection(_broker, "guest", "guest", randomize("Client"), "test"); destination = new AMQQueue(randomize("LatencyTest"), true); session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java index 6c064e3853..b6c539d91c 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java @@ -28,7 +28,7 @@ public class TemporaryQueueTest extends TestCase protected Connection createConnection() throws AMQException, URLSyntaxException
{
return new AMQConnection(_broker, "guest", "guest",
- "fred", "/test");
+ "fred", "test");
}
public void testTempoaryQueue() throws Exception
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/close/TopicPublisherCloseTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/close/TopicPublisherCloseTest.java index c9240e9be7..7cbd4e8bdd 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/close/TopicPublisherCloseTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/close/TopicPublisherCloseTest.java @@ -51,7 +51,7 @@ public class TopicPublisherCloseTest extends TestCase public void testAllMethodsThrowAfterConnectionClose() throws Exception { - AMQConnection connection = new AMQConnection(_connectionString, "guest", "guest", "Client", "/test_path"); + AMQConnection connection = new AMQConnection(_connectionString, "guest", "guest", "Client", "test"); Topic destination1 = new AMQTopic("t1"); TopicSession session1 = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java index 23e3b9cc88..1f53d7de65 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java @@ -40,12 +40,12 @@ public class JMSDestinationTest extends TestCase public void testJMSDestination() throws Exception
{
- Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+ Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = new AMQQueue(new AMQShortString("someQ"), new AMQShortString("someQ"), false, true);
MessageConsumer consumer = consumerSession.createConsumer(queue);
- Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageProducer producer = producerSession.createProducer(queue);
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java index 4f0ca6d3aa..7d83d19d74 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java @@ -59,7 +59,7 @@ public class StreamMessageTest extends TestCase public void testStreamMessageEOF() throws Exception { - Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); + Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test"); AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE); @@ -72,7 +72,7 @@ public class StreamMessageTest extends TestCase //force synch to ensure the consumer has resulted in a bound queue ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS); - Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test"); + Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test"); AMQSession producerSession = (AMQSession) con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); @@ -113,7 +113,7 @@ public class StreamMessageTest extends TestCase public void testModifyReceivedMessageExpandsBuffer() throws Exception { - Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); + Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test"); AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE); AMQQueue queue = new AMQQueue("testQ"); MessageConsumer consumer = consumerSession.createConsumer(queue); @@ -135,7 +135,7 @@ public class StreamMessageTest extends TestCase } } }); - Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test"); + Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test"); AMQSession producerSession = (AMQSession) con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); MessageProducer mandatoryProducer = producerSession.createProducer(queue); con.start(); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java index 8263e7f126..7e645f1a26 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java @@ -55,7 +55,7 @@ public class DurableSubscriptionTest extends TestCase public void testUnsubscribe() throws AMQException, JMSException, URLSyntaxException { AMQTopic topic = new AMQTopic("MyTopic"); - AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "/test"); + AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "test"); Session session1 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE); MessageConsumer consumer1 = session1.createConsumer(topic); MessageProducer producer = session1.createProducer(topic); @@ -96,7 +96,7 @@ public class DurableSubscriptionTest extends TestCase public void testDurability() throws AMQException, JMSException, URLSyntaxException { AMQTopic topic = new AMQTopic("MyTopic"); - AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "/test"); + AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "test"); Session session1 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE); MessageConsumer consumer1 = session1.createConsumer(topic); MessageProducer producer = session1.createProducer(topic); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicPublisherTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicPublisherTest.java index 4ffb3e8459..c4acf15a58 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicPublisherTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicPublisherTest.java @@ -48,7 +48,7 @@ public class TopicPublisherTest extends TestCase public void testUnidentifiedProducer() throws Exception { AMQTopic topic = new AMQTopic("MyTopic"); - AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "/test"); + AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "test"); TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); TopicPublisher publisher = session1.createPublisher(null); MessageConsumer consumer1 = session1.createConsumer(topic); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java index 8a6e279142..84c7a61a56 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java @@ -51,7 +51,7 @@ public class TopicSessionTest extends TestCase public void testTopicSubscriptionUnsubscription() throws Exception { AMQTopic topic = new AMQTopic("MyTopic"); - AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test"); + AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test"); TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); TopicSubscriber sub = session1.createDurableSubscriber(topic,"subscription0"); TopicPublisher publisher = session1.createPublisher(topic); @@ -97,7 +97,7 @@ public class TopicSessionTest extends TestCase { AMQTopic topic = new AMQTopic("MyTopic1" + String.valueOf(shutdown)); AMQTopic topic2 = new AMQTopic("MyOtherTopic1" + String.valueOf(shutdown)); - AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test"); + AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test"); TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); TopicSubscriber sub = session1.createDurableSubscriber(topic, "subscription0"); TopicPublisher publisher = session1.createPublisher(null); @@ -112,7 +112,7 @@ public class TopicSessionTest extends TestCase { session1.close(); con.close(); - con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test"); + con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test"); con.start(); session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); publisher = session1.createPublisher(null); @@ -134,11 +134,11 @@ public class TopicSessionTest extends TestCase public void testUnsubscriptionAfterConnectionClose() throws Exception { AMQTopic topic = new AMQTopic("MyTopic3"); - AMQConnection con1 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test"); + AMQConnection con1 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test"); TopicSession session1 = con1.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); TopicPublisher publisher = session1.createPublisher(topic); - AMQConnection con2 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test2", "/test"); + AMQConnection con2 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test2", "test"); TopicSession session2 = con2.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); TopicSubscriber sub = session2.createDurableSubscriber(topic, "subscription0"); @@ -149,7 +149,7 @@ public class TopicSessionTest extends TestCase assertNotNull(tm); con2.close(); publisher.publish(session1.createTextMessage("Hello2")); - con2 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test2", "/test"); + con2 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test2", "test"); session2 = con2.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); sub = session2.createDurableSubscriber(topic, "subscription0"); con2.start(); @@ -163,7 +163,7 @@ public class TopicSessionTest extends TestCase public void testTextMessageCreation() throws Exception { AMQTopic topic = new AMQTopic("MyTopic4"); - AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test"); + AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test"); TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); TopicPublisher publisher = session1.createPublisher(topic); MessageConsumer consumer1 = session1.createConsumer(topic); @@ -202,7 +202,7 @@ public class TopicSessionTest extends TestCase public void testSendingSameMessage() throws Exception { - AMQConnection conn = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test"); + AMQConnection conn = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test"); TopicSession session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TemporaryTopic topic = session.createTemporaryTopic(); assertNotNull(topic); @@ -224,7 +224,7 @@ public class TopicSessionTest extends TestCase public void testTemporaryTopic() throws Exception { - AMQConnection conn = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test"); + AMQConnection conn = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test"); TopicSession session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TemporaryTopic topic = session.createTemporaryTopic(); assertNotNull(topic); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java index bbad5862a0..18b72e5538 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java @@ -61,7 +61,7 @@ public class TransactedTest extends TestCase queue1 = new AMQQueue("Q1", false); queue2 = new AMQQueue("Q2", false); - con = new AMQConnection("vm://:1", "guest", "guest", "TransactedTest", "/test"); + con = new AMQConnection("vm://:1", "guest", "guest", "TransactedTest", "test"); session = con.createSession(true, 0); consumer1 = session.createConsumer(queue1); //Dummy just to create the queue. @@ -70,7 +70,7 @@ public class TransactedTest extends TestCase producer2 = session.createProducer(queue2); con.start(); - prepCon = new AMQConnection("vm://:1", "guest", "guest", "PrepConnection", "/test"); + prepCon = new AMQConnection("vm://:1", "guest", "guest", "PrepConnection", "test"); prepSession = prepCon.createSession(false, AMQSession.NO_ACKNOWLEDGE); prepProducer1 = prepSession.createProducer(queue1); prepCon.start(); @@ -81,7 +81,7 @@ public class TransactedTest extends TestCase prepProducer1.send(prepSession.createTextMessage("B")); prepProducer1.send(prepSession.createTextMessage("C")); - testCon = new AMQConnection("vm://:1", "guest", "guest", "TestConnection", "/test"); + testCon = new AMQConnection("vm://:1", "guest", "guest", "TestConnection", "test"); testSession = testCon.createSession(false, AMQSession.NO_ACKNOWLEDGE); testConsumer1 = testSession.createConsumer(queue1); testConsumer2 = testSession.createConsumer(queue2); @@ -142,7 +142,7 @@ public class TransactedTest extends TestCase public void testResendsMsgsAfterSessionClose() throws Exception { - Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); + Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test"); Session consumerSession = con.createSession(true, Session.CLIENT_ACKNOWLEDGE); AMQQueue queue3 = new AMQQueue("Q3", false); @@ -150,7 +150,7 @@ public class TransactedTest extends TestCase //force synch to ensure the consumer has resulted in a bound queue ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); - Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test"); + Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test"); Session producerSession = con2.createSession(true, Session.CLIENT_ACKNOWLEDGE); MessageProducer producer = producerSession.createProducer(queue3); diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java index 352928b121..80f9ef62b1 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java @@ -46,7 +46,7 @@ class ClusterBuilder ServerHandlerRegistry getHandlerRegistry() { - return new ServerHandlerRegistry(getHandlerFactory(), null, null, null); + return new ServerHandlerRegistry(getHandlerFactory(), null, null); } private MethodHandlerFactory getHandlerFactory() diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java index c1306b4c13..8419ec5668 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java @@ -38,6 +38,7 @@ import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.cluster.util.LogMessage; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; import java.net.InetSocketAddress; @@ -55,13 +56,8 @@ public class ClusteredProtocolHandler extends AMQPFastProtocolHandler implements } public ClusteredProtocolHandler(IApplicationRegistry registry, InetSocketAddress address) - { - this(registry.getQueueRegistry(), registry.getExchangeRegistry(), address); - } - - public ClusteredProtocolHandler(QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, InetSocketAddress address) - { - super(queueRegistry, exchangeRegistry); + { + super(registry); ClusterBuilder builder = new ClusterBuilder(address); _groupMgr = builder.getGroupManager(); _handlers = builder.getHandlerRegistry(); @@ -74,9 +70,9 @@ public class ClusteredProtocolHandler extends AMQPFastProtocolHandler implements _handlers = handler._handlers; } - protected void createSession(IoSession session, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession protocolSession, AMQCodecFactory codec) throws AMQException + protected void createSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQProtocolSession protocolSession, AMQCodecFactory codec) throws AMQException { - new ClusteredProtocolSession(session, queues, exchanges, codec, new ServerHandlerRegistry(_handlers, queues, exchanges, protocolSession)); + new ClusteredProtocolSession(session, virtualHostRegistry, codec, new ServerHandlerRegistry(_handlers, virtualHostRegistry, protocolSession)); } void connect(String join) throws Exception diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java index 04c5f7b451..fc635cc7ea 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java @@ -24,6 +24,8 @@ import org.apache.mina.common.IoSession; import org.apache.qpid.AMQException; import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQMinaProtocolSession; import org.apache.qpid.server.protocol.AMQProtocolSession; @@ -37,11 +39,11 @@ public class ClusteredProtocolSession extends AMQMinaProtocolSession { private MemberHandle _peer; - public ClusteredProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQCodecFactory codecFactory, AMQStateManager stateManager) throws AMQException + public ClusteredProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory, AMQStateManager stateManager) throws AMQException // public ClusteredProtocolSession(IoSession session, QueueRegistry queueRegistry, // ExchangeRegistry exchangeRegistry, AMQCodecFactory codecFactory) throws AMQException { - super(session, queueRegistry, exchangeRegistry, codecFactory, stateManager); + super(session, virtualHostRegistry, codecFactory, stateManager); // super(session, queueRegistry, exchangeRegistry, codecFactory); } @@ -66,7 +68,7 @@ public class ClusteredProtocolSession extends AMQMinaProtocolSession AMQChannel channel = super.getChannel(channelId); if (isPeerSession() && channel == null) { - channel = new OneUseChannel(channelId); + channel = new OneUseChannel(channelId, getVirtualHost()); addChannel(channel); } return channel; @@ -102,18 +104,12 @@ public class ClusteredProtocolSession extends AMQMinaProtocolSession */ private class OneUseChannel extends AMQChannel { - public OneUseChannel(int channelId) - throws AMQException - { - this(channelId, ApplicationRegistry.getInstance()); - } - - public OneUseChannel(int channelId, IApplicationRegistry registry) + public OneUseChannel(int channelId, VirtualHost virtualHost) throws AMQException { super(channelId, - registry.getMessageStore(), - registry.getExchangeRegistry()); + virtualHost.getMessageStore(), + virtualHost.getExchangeRegistry()); } protected void routeCurrentMessage() throws AMQException diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java index 27d5629f27..03b0dc7f2e 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java @@ -30,6 +30,7 @@ import org.apache.qpid.server.cluster.util.LogMessage; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; import java.util.HashMap; import java.util.Map; @@ -43,23 +44,20 @@ class ServerHandlerRegistry extends AMQStateManager private final Logger _logger = Logger.getLogger(ServerHandlerRegistry.class); private final Map<AMQState, MethodHandlerRegistry> _handlers = new HashMap<AMQState, MethodHandlerRegistry>(); - ServerHandlerRegistry(QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, - AMQProtocolSession protocolSession) + ServerHandlerRegistry(VirtualHostRegistry virtualHostRegistry, AMQProtocolSession protocolSession) { - super(AMQState.CONNECTION_NOT_STARTED, false, queueRegistry, exchangeRegistry, protocolSession); + super(AMQState.CONNECTION_NOT_STARTED, false, virtualHostRegistry, protocolSession); } - ServerHandlerRegistry(ServerHandlerRegistry s, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession) + ServerHandlerRegistry(ServerHandlerRegistry s, VirtualHostRegistry virtualHostRegistry, AMQProtocolSession protocolSession) { - this(queueRegistry, exchangeRegistry, protocolSession); + this(virtualHostRegistry, protocolSession); _handlers.putAll(s._handlers); } - ServerHandlerRegistry(MethodHandlerFactory factory, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession) + ServerHandlerRegistry(MethodHandlerFactory factory, VirtualHostRegistry virtualHostRegistry, AMQProtocolSession protocolSession) { - this(queueRegistry, exchangeRegistry, protocolSession); + this(virtualHostRegistry, protocolSession); init(factory); } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java index c4107a435b..86710e8a31 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java @@ -24,6 +24,7 @@ import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQMethodBody; @@ -54,19 +55,19 @@ public class ChainedClusterMethodHandler <A extends AMQMethodBody> extends Clust } } - protected final void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException + protected final void peer(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException { for(ClusterMethodHandler<A> handler : _handlers) { - handler.peer(stateMgr, queues, exchanges, session, evt); + handler.peer(stateMgr, evt); } } - protected final void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException + protected final void client(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException { for(ClusterMethodHandler<A> handler : _handlers) { - handler.client(stateMgr, queues, exchanges, session, evt); + handler.client(stateMgr, evt); } } } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java index 27d3e28b88..c9f6dbfb37 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java @@ -27,6 +27,7 @@ import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.cluster.util.LogMessage; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; import org.apache.qpid.AMQException; import org.apache.qpid.framing.*; import org.apache.log4j.Logger; @@ -79,22 +80,22 @@ class ChannelQueueManager private class QueueDeclareHandler extends ClusterMethodHandler<QueueDeclareBody> { - protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException + protected void peer(AMQStateManager stateMgr, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException { } - protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException + protected void client(AMQStateManager stateMgr, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException { set(evt.getChannelId(), evt.getMethod().queue); } } private class QueueBindHandler extends ClusterMethodHandler<QueueBindBody> { - protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueBindBody> evt) throws AMQException + protected void peer(AMQStateManager stateMgr, AMQMethodEvent<QueueBindBody> evt) throws AMQException { } - protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueBindBody> evt) throws AMQException + protected void client(AMQStateManager stateMgr, AMQMethodEvent<QueueBindBody> evt) throws AMQException { if(evt.getMethod().queue == null) { @@ -104,11 +105,11 @@ class ChannelQueueManager } private class QueueDeleteHandler extends ClusterMethodHandler<QueueDeleteBody> { - protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException + protected void peer(AMQStateManager stateMgr, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException { } - protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException + protected void client(AMQStateManager stateMgr, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException { if(evt.getMethod().queue == null) { @@ -119,11 +120,11 @@ class ChannelQueueManager private class BasicConsumeHandler extends ClusterMethodHandler<BasicConsumeBody> { - protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException + protected void peer(AMQStateManager stateMgr, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException { } - protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException + protected void client(AMQStateManager stateMgr, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException { if(evt.getMethod().queue == null) { diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java index 971fa5393b..faab99b0f6 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java @@ -32,18 +32,20 @@ import org.apache.qpid.AMQException; public abstract class ClusterMethodHandler<A extends AMQMethodBody> implements StateAwareMethodListener<A> { - public final void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException + public final void methodReceived(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException { + AMQProtocolSession session = stateMgr.getProtocolSession(); + if (ClusteredProtocolSession.isPeerSession(session)) { - peer(stateMgr, queues, exchanges, session, evt); + peer(stateMgr, evt); } else { - client(stateMgr, queues, exchanges, session, evt); + client(stateMgr, evt); } } - protected abstract void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException; - protected abstract void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException; + protected abstract void peer(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException; + protected abstract void client(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException; } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java index 483096f29d..cd897671cc 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java @@ -135,19 +135,15 @@ public class ClusterMethodHandlerFactory implements MethodHandlerFactory private class SynchHandler implements StateAwareMethodListener<ClusterSynchBody> { - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession session, - AMQMethodEvent<ClusterSynchBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ClusterSynchBody> evt) throws AMQException { - _groupMgr.handleSynch(ClusteredProtocolSession.getSessionPeer(session)); + _groupMgr.handleSynch(ClusteredProtocolSession.getSessionPeer(stateManager.getProtocolSession())); } } private class JoinHandler implements StateAwareMethodListener<ClusterJoinBody> { - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession session, - AMQMethodEvent<ClusterJoinBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ClusterJoinBody> evt) throws AMQException { _groupMgr.handleJoin(new SimpleMemberHandle(evt.getMethod().broker)); } @@ -155,9 +151,7 @@ public class ClusterMethodHandlerFactory implements MethodHandlerFactory private class LeaveHandler implements StateAwareMethodListener<ClusterLeaveBody> { - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, - AMQMethodEvent<ClusterLeaveBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ClusterLeaveBody> evt) throws AMQException { _groupMgr.handleLeave(new SimpleMemberHandle(evt.getMethod().broker)); } @@ -165,9 +159,7 @@ public class ClusterMethodHandlerFactory implements MethodHandlerFactory private class SuspectHandler implements StateAwareMethodListener<ClusterSuspectBody> { - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, - AMQMethodEvent<ClusterSuspectBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ClusterSuspectBody> evt) throws AMQException { _groupMgr.handleSuspect(new SimpleMemberHandle(evt.getMethod().broker)); } @@ -175,9 +167,7 @@ public class ClusterMethodHandlerFactory implements MethodHandlerFactory private class MembershipHandler implements StateAwareMethodListener<ClusterMembershipBody> { - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession session, - AMQMethodEvent<ClusterMembershipBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ClusterMembershipBody> evt) throws AMQException { ClusterMembershipBody body = evt.getMethod(); _groupMgr.handleMembershipAnnouncement(new String(body.members)); @@ -186,16 +176,14 @@ public class ClusterMethodHandlerFactory implements MethodHandlerFactory private class PingHandler implements StateAwareMethodListener<ClusterPingBody> { - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession session, - AMQMethodEvent<ClusterPingBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ClusterPingBody> evt) throws AMQException { MemberHandle peer = new SimpleMemberHandle(evt.getMethod().broker); _groupMgr.handlePing(peer, evt.getMethod().load); if (evt.getMethod().responseRequired) { evt.getMethod().load = _loadTable.getLocalLoad(); - session.writeFrame(new AMQFrame(evt.getChannelId(), evt.getMethod())); + stateManager.getProtocolSession().writeFrame(new AMQFrame(evt.getChannelId(), evt.getMethod())); } } } @@ -207,12 +195,12 @@ public class ClusterMethodHandlerFactory implements MethodHandlerFactory super(ConnectionOpenMethodHandler.getInstance()); } - void postHandle(AMQStateManager stateMgr, AMQProtocolSession session, AMQMethodEvent<ConnectionOpenBody> evt) + void postHandle(AMQStateManager stateMgr, AMQMethodEvent<ConnectionOpenBody> evt) { AMQShortString capabilities = evt.getMethod().capabilities; if (ClusterCapability.contains(capabilities)) { - ClusteredProtocolSession.setSessionPeer(session, ClusterCapability.getPeer(capabilities)); + ClusteredProtocolSession.setSessionPeer(stateMgr.getProtocolSession(), ClusterCapability.getPeer(capabilities)); } else { @@ -228,9 +216,9 @@ public class ClusterMethodHandlerFactory implements MethodHandlerFactory super(ConnectionCloseMethodHandler.getInstance()); } - void postHandle(AMQStateManager stateMgr, AMQProtocolSession session, AMQMethodEvent<ConnectionCloseBody> evt) + void postHandle(AMQStateManager stateMgr, AMQMethodEvent<ConnectionCloseBody> evt) { - if (!ClusteredProtocolSession.isPeerSession(session)) + if (!ClusteredProtocolSession.isPeerSession(stateMgr.getProtocolSession())) { _loadTable.decrementLocalLoad(); } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java index 7eb3d7291c..a2f62f714b 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java @@ -38,18 +38,18 @@ class ExtendedHandler<A extends AMQMethodBody> implements StateAwareMethodListen _base = base; } - public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException + public void methodReceived(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException { - preHandle(stateMgr, session, evt); - _base.methodReceived(stateMgr, queues, exchanges, session, evt); - postHandle(stateMgr, session, evt); + preHandle(stateMgr, evt); + _base.methodReceived(stateMgr, evt); + postHandle(stateMgr, evt); } - void preHandle(AMQStateManager stateMgr, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException + void preHandle(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException { } - void postHandle(AMQStateManager stateMgr, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException + void postHandle(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException { } } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java index 6b876095a4..f01a8349f2 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java @@ -35,6 +35,7 @@ import org.apache.qpid.server.queue.ClusteredQueue; import org.apache.qpid.server.queue.PrivateQueue; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.queue.RemoteQueueProxy; +import org.apache.qpid.server.virtualhost.VirtualHost; public class LocalQueueDeclareHandler extends QueueDeclareHandler { @@ -51,7 +52,7 @@ public class LocalQueueDeclareHandler extends QueueDeclareHandler return new AMQShortString(super.createName().toString() + "@" + _groupMgr.getLocal().getDetails()); } - protected AMQQueue createQueue(QueueDeclareBody body, QueueRegistry registry, AMQProtocolSession session) throws AMQException + protected AMQQueue createQueue(QueueDeclareBody body, VirtualHost virtualHost, AMQProtocolSession session) throws AMQException { //is it private or shared: if (body.exclusive) @@ -61,18 +62,18 @@ public class LocalQueueDeclareHandler extends QueueDeclareHandler //need to get peer from the session... MemberHandle peer = ClusteredProtocolSession.getSessionPeer(session); _logger.debug(new LogMessage("Creating proxied queue {0} on behalf of {1}", body.queue, peer)); - return new RemoteQueueProxy(peer, _groupMgr, body.queue, body.durable, new AMQShortString(peer.getDetails()), body.autoDelete, registry); + return new RemoteQueueProxy(peer, _groupMgr, body.queue, body.durable, new AMQShortString(peer.getDetails()), body.autoDelete, virtualHost); } else { _logger.debug(new LogMessage("Creating local private queue {0}", body.queue)); - return new PrivateQueue(_groupMgr, body.queue, body.durable, session.getContextKey(), body.autoDelete, registry); + return new PrivateQueue(_groupMgr, body.queue, body.durable, session.getContextKey(), body.autoDelete, virtualHost); } } else { _logger.debug(new LogMessage("Creating local shared queue {0}", body.queue)); - return new ClusteredQueue(_groupMgr, body.queue, body.durable, null, body.autoDelete, registry); + return new ClusteredQueue(_groupMgr, body.queue, body.durable, null, body.autoDelete, virtualHost); } } } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java index 7f19569dbc..8b0bb4b127 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java @@ -31,7 +31,7 @@ import org.apache.qpid.server.state.StateAwareMethodListener; public class NullListener<T extends AMQMethodBody> implements StateAwareMethodListener<T> { - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, AMQMethodEvent<T> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<T> evt) throws AMQException { } } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java index 150b707071..447e51ccd9 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java @@ -47,14 +47,14 @@ public class PeerHandler<A extends AMQMethodBody> extends ClusterMethodHandler<A _client = client; } - protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException + protected void peer(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException { - _peer.methodReceived(stateMgr, queues, exchanges, session, evt); + _peer.methodReceived(stateMgr, evt); } - protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException + protected void client(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException { - _client.methodReceived(stateMgr, queues, exchanges, session, evt); + _client.methodReceived(stateMgr, evt); } } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java index 6668faca65..a669171d3c 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java @@ -41,12 +41,11 @@ class QueueNameGenerator extends ClusterMethodHandler<QueueDeclareBody> _handler = handler; } - protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException + protected void peer(AMQStateManager stateMgr, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException { } - protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, - AMQProtocolSession session, AMQMethodEvent<QueueDeclareBody> evt) + protected void client(AMQStateManager stateMgr, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException { setName(evt.getMethod());//need to set the name before propagating this method diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java index 0699678c9f..f09763e1ad 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java @@ -32,15 +32,21 @@ import org.apache.qpid.server.queue.ClusteredQueue; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.virtualhost.VirtualHost; public class RemoteCancelHandler implements StateAwareMethodListener<BasicCancelBody> { private final Logger _logger = Logger.getLogger(RemoteCancelHandler.class); - public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<BasicCancelBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicCancelBody> evt) throws AMQException { + AMQProtocolSession session = stateManager.getProtocolSession(); + VirtualHost virtualHost = session.getVirtualHost(); + QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); + + //By convention, consumers setup between brokers use the queue name as the consumer tag: - AMQQueue queue = queues.getQueue(evt.getMethod().consumerTag); + AMQQueue queue = queueRegistry.getQueue(evt.getMethod().consumerTag); if (queue instanceof ClusteredQueue) { ((ClusteredQueue) queue).removeRemoteSubscriber(ClusteredProtocolSession.getSessionPeer(session)); diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java index c58ae291dd..073b13688c 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java @@ -33,6 +33,7 @@ import org.apache.qpid.server.queue.ClusteredQueue; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.virtualhost.VirtualHost; /** * Handles consume requests from other cluster members. @@ -42,9 +43,13 @@ public class RemoteConsumeHandler implements StateAwareMethodListener<BasicConsu { private final Logger _logger = Logger.getLogger(RemoteConsumeHandler.class); - public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException { - AMQQueue queue = queues.getQueue(evt.getMethod().queue); + AMQProtocolSession session = stateManager.getProtocolSession(); + VirtualHost virtualHost = session.getVirtualHost(); + QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); + + AMQQueue queue = queueRegistry.getQueue(evt.getMethod().queue); if (queue instanceof ClusteredQueue) { ((ClusteredQueue) queue).addRemoteSubcriber(ClusteredProtocolSession.getSessionPeer(session)); diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java index 26a4967417..897f8e4fb7 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java @@ -33,6 +33,7 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.virtualhost.VirtualHost; public class ReplicatingConsumeHandler extends ReplicatingHandler<BasicConsumeBody> { @@ -46,17 +47,22 @@ public class ReplicatingConsumeHandler extends ReplicatingHandler<BasicConsumeBo super(groupMgr, base(), policy); } - protected void replicate(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException + protected void replicate(AMQStateManager stateManager, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException { + AMQProtocolSession session = stateManager.getProtocolSession(); + VirtualHost virtualHost = session.getVirtualHost(); + ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); + QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); + //only replicate if the queue in question is a shared queue - if (isShared(queues.getQueue(evt.getMethod().queue))) + if (isShared(queueRegistry.getQueue(evt.getMethod().queue))) { - super.replicate(stateMgr, queues, exchanges, session, evt); + super.replicate(stateManager, evt); } else { _logger.info(new LogMessage("Handling consume for private queue ({0}) locally", evt.getMethod())); - local(stateMgr, queues, exchanges, session, evt); + local(stateManager, evt); _logger.info(new LogMessage("Handled consume for private queue ({0}) locally", evt.getMethod())); } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java index 84f97e7f59..888fa4e426 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java @@ -32,6 +32,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.List; @@ -60,52 +61,51 @@ class ReplicatingHandler<A extends AMQMethodBody> extends ClusterMethodHandler<A _policy = policy; } - protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException + protected void peer(AMQStateManager stateManager, AMQMethodEvent<A> evt) throws AMQException { - local(stateMgr, queues, exchanges, session, evt); + AMQProtocolSession session = stateManager.getProtocolSession(); + VirtualHost virtualHost = session.getVirtualHost(); + ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); + QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); + + local(stateManager, evt); _logger.debug(new LogMessage("Handled {0} locally", evt.getMethod())); } - protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException + protected void client(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException { - replicate(stateMgr, queues, exchanges, session, evt); + replicate(stateMgr, evt); } - protected void replicate(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException + protected void replicate(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException { if (_policy == null) { //asynch delivery _groupMgr.broadcast(new SimpleBodySendable(evt.getMethod())); - local(stateMgr, queues, exchanges, session, evt); + local(stateMgr, evt); } else { - Callback callback = new Callback(stateMgr, queues, exchanges, session, evt); + Callback callback = new Callback(stateMgr, evt); _groupMgr.broadcast(new SimpleBodySendable(evt.getMethod()), _policy, callback); } _logger.debug(new LogMessage("Replicated {0} to peers", evt.getMethod())); } - protected void local(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException + protected void local(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException { - _base.methodReceived(stateMgr, queues, exchanges, session, evt); + _base.methodReceived(stateMgr, evt); } private class Callback implements GroupResponseHandler { private final AMQStateManager _stateMgr; - private final QueueRegistry _queues; - private final ExchangeRegistry _exchanges; - private final AMQProtocolSession _session; private final AMQMethodEvent<A> _evt; - Callback(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) + Callback(AMQStateManager stateMgr, AMQMethodEvent<A> evt) { _stateMgr = stateMgr; - _queues = queues; - _exchanges = exchanges; - _session = session; _evt = evt; } @@ -113,7 +113,7 @@ class ReplicatingHandler<A extends AMQMethodBody> extends ClusterMethodHandler<A { try { - local(_stateMgr, _queues, _exchanges, _session, _evt); + local(_stateMgr, _evt); _logger.debug(new LogMessage("Handled {0} locally, in response to completion of replication", _evt.getMethod())); } catch (AMQException e) diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java index 2561da36a8..8b0c638d63 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java @@ -42,11 +42,11 @@ public class WrappedListener<T extends AMQMethodBody> implements StateAwareMetho _primary = check(primary); } - public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<T> evt) throws AMQException + public void methodReceived(AMQStateManager stateMgr, AMQMethodEvent<T> evt) throws AMQException { - _pre.methodReceived(stateMgr, queues, exchanges, session, evt); - _primary.methodReceived(stateMgr, queues, exchanges, session, evt); - _post.methodReceived(stateMgr, queues, exchanges, session, evt); + _pre.methodReceived(stateMgr, evt); + _primary.methodReceived(stateMgr, evt); + _post.methodReceived(stateMgr, evt); } private static <T extends AMQMethodBody> StateAwareMethodListener<T> check(StateAwareMethodListener<T> in) diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java index ad9de8d93f..8ac4b9b2c7 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java @@ -32,6 +32,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.ArrayList; import java.util.HashMap; @@ -73,8 +74,11 @@ public class ReplayStore implements ReplayManager, StateAwareMethodListener _localRecorders.put(ExchangeDeleteBody.class, new ExchangeDeleteRecorder()); } - public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException { + AMQProtocolSession session = stateManager.getProtocolSession(); + VirtualHost virtualHost = session.getVirtualHost(); + _logger.debug(new LogMessage("Replay store received {0}", evt.getMethod())); AMQMethodBody request = evt.getMethod(); diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java index 5cf6d5c3ff..19be638051 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java @@ -29,6 +29,7 @@ import org.apache.qpid.server.cluster.*; import org.apache.qpid.server.cluster.util.LogMessage; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; @@ -46,22 +47,14 @@ public class ClusteredQueue extends AMQQueue private final GroupManager _groupMgr; private final NestedSubscriptionManager _subscriptions; - public ClusteredQueue(GroupManager groupMgr, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, QueueRegistry queueRegistry) + public ClusteredQueue(GroupManager groupMgr, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost) throws AMQException { - super(name, durable, owner, autoDelete, queueRegistry, new ClusteredSubscriptionManager()); + super(name, durable, owner, autoDelete, virtualHost, new ClusteredSubscriptionManager()); _groupMgr = groupMgr; _subscriptions = ((ClusteredSubscriptionManager) getSubscribers()).getAllSubscribers(); } - public ClusteredQueue(GroupManager groupMgr, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, QueueRegistry queueRegistry, Executor asyncDelivery) - throws AMQException - { - super(name, durable, owner, autoDelete, queueRegistry, asyncDelivery, new ClusteredSubscriptionManager(), - new SubscriptionImpl.Factory()); - _groupMgr = groupMgr; - _subscriptions = ((ClusteredSubscriptionManager) getSubscribers()).getAllSubscribers(); - } public void process(StoreContext storeContext, AMQMessage msg) throws AMQException { diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java index 568de62d1b..95ab34ccf9 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java @@ -24,6 +24,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.server.cluster.SimpleSendable; import org.apache.qpid.server.cluster.GroupManager; import org.apache.qpid.server.cluster.SimpleBodySendable; +import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.framing.QueueDeleteBody; import org.apache.qpid.framing.AMQShortString; @@ -37,21 +38,14 @@ public class PrivateQueue extends AMQQueue { private final GroupManager _groupMgr; - public PrivateQueue(GroupManager groupMgr, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, QueueRegistry queueRegistry) + public PrivateQueue(GroupManager groupMgr, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost) throws AMQException { - super(name, durable, owner, autoDelete, queueRegistry); + super(name, durable, owner, autoDelete, virtualHost); _groupMgr = groupMgr; } - public PrivateQueue(GroupManager groupMgr, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, QueueRegistry queueRegistry, Executor asyncDelivery) - throws AMQException - { - super(name, durable, owner, autoDelete, queueRegistry, asyncDelivery); - _groupMgr = groupMgr; - } - protected void autodelete() throws AMQException { //delete locally: diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java index a6cce05a03..d0a64c7d6f 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java @@ -29,6 +29,7 @@ import org.apache.qpid.server.cluster.GroupManager; import org.apache.qpid.server.cluster.MemberHandle; import org.apache.qpid.server.cluster.SimpleSendable; import org.apache.qpid.server.cluster.util.LogMessage; +import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.concurrent.Executor; @@ -43,23 +44,15 @@ public class RemoteQueueProxy extends AMQQueue private final MemberHandle _target; private final GroupManager _groupMgr; - public RemoteQueueProxy(MemberHandle target, GroupManager groupMgr, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, QueueRegistry queueRegistry) + public RemoteQueueProxy(MemberHandle target, GroupManager groupMgr, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost) throws AMQException { - super(name, durable, owner, autoDelete, queueRegistry); + super(name, durable, owner, autoDelete, virtualHost); _target = target; _groupMgr = groupMgr; _groupMgr.addMemberhipChangeListener(new ProxiedQueueCleanup(target, this)); } - public RemoteQueueProxy(MemberHandle target, GroupManager groupMgr, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, QueueRegistry queueRegistry, Executor asyncDelivery) - throws AMQException - { - super(name, durable, owner, autoDelete, queueRegistry, asyncDelivery); - _target = target; - _groupMgr = groupMgr; - _groupMgr.addMemberhipChangeListener(new ProxiedQueueCleanup(target, this)); - } public void deliver(AMQMessage msg) throws NoConsumersException { diff --git a/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java b/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java index 57159f3802..172f1b1790 100644 --- a/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java +++ b/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java @@ -39,4 +39,10 @@ public class ExchangeDefaults public final static AMQShortString FANOUT_EXCHANGE_NAME = new AMQShortString("amq.fanout"); public final static AMQShortString FANOUT_EXCHANGE_CLASS = new AMQShortString("fanout"); + + + public final static AMQShortString SYSTEM_MANAGEMENT_EXCHANGE_NAME = new AMQShortString("qpid.sysmgmt"); + + public final static AMQShortString SYSTEM_MANAGEMENT_CLASS = new AMQShortString("sysmmgmt"); + } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java b/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java index b891c914ec..ed6ab63ded 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java @@ -531,6 +531,13 @@ public class BasicContentHeaderProperties implements ContentHeaderProperties return _replyTo == null ? null : _replyTo.toString(); } + public AMQShortString getReplyToAsShortString() + { + decodeIfNecessary(); + return _replyTo; + } + + public void setReplyTo(String replyTo) { setReplyTo(replyTo == null ? null : new AMQShortString(replyTo)); diff --git a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/Constants.java b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/Constants.java index 756d404596..8817f6c2c5 100644 --- a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/Constants.java +++ b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/Constants.java @@ -32,7 +32,7 @@ public class Constants public final static String ITEM_TYPE = "type"; public final static String SERVER = "server"; public final static String DOMAIN = "domain"; - public final static String TYPE = "mbeantype"; + public final static String NODE_TYPE_MBEANTYPE = "mbeantype"; public final static String MBEAN = "mbean"; public final static String ATTRIBUTE = "Attribute"; public final static String ATTRIBUTES = "Attributes"; @@ -45,13 +45,14 @@ public class Constants public final static String NAVIGATION_ROOT = "Qpid Connections"; public final static String DESCRIPTION = " Description"; - - public final static String BROKER_MANAGER = "Broker_Manager"; - public final static String QUEUE = "Queue"; - public final static String EXCHANGE = "Exchange"; + + public final static String VIRTUAL_HOST = "VirtualHost"; + public final static String MBEAN_TYPE_BROKER_MANAGER = "VirtualHost.BrokerManager"; + public final static String MBEAN_TYPE_QUEUE = "VirtualHost.Queue"; + public final static String MBEAN_TYPE_EXCHANGE = "VirtualHost.Exchange"; public final static String EXCHANGE_TYPE = "ExchangeType"; public final static String[] EXCHANGE_TYPE_VALUES = {"direct", "topic", "headers"}; - public final static String CONNECTION ="Connection"; + public final static String MBEAN_TYPE_CONNECTION ="Connection"; public final static String ACTION_ADDSERVER = "New Connection"; @@ -87,4 +88,13 @@ public class Constants public final static int OPERATION_IMPACT_ACTION = 1; public final static int OPERATION_IMPACT_ACTIONINFO = 2; public final static int OPERATION_IMPACT_UNKNOWN = 3; + public static final String NODE_TYPE_VIRTUAL_HOST = "virtualhost"; + public static final String NODE_LABEL_QUEUES = "queues"; + public static final String NODE_LABEL_EXCHANGES = "exchanges"; + public static final String NODE_LABEL_CONNECTIONS = "connections"; + + public static final String NODE_LABEL_VIRTUAL_HOSTS = "virtual hosts"; + public static final String TAB_LABEL_QUEUES = "queues"; + public static final String TAB_LABEL_EXCHANGES = "exchanges"; + public static final String TAB_LABEL_CONNECTIONS = "connections"; } diff --git a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ServerRegistry.java b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ServerRegistry.java index 6fbfdcd06f..cc106c445b 100644 --- a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ServerRegistry.java +++ b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ServerRegistry.java @@ -23,6 +23,10 @@ package org.apache.qpid.management.ui; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.HashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentHashMap; import org.apache.qpid.management.ui.jmx.ClientListener; import org.apache.qpid.management.ui.model.ManagedAttributeModel; @@ -33,11 +37,11 @@ public abstract class ServerRegistry { private ManagedServer _managedServer = null; // list of all Connection mbeans - protected List<ManagedBean> _connections = new ArrayList<ManagedBean>(); + protected ConcurrentMap<String,List<ManagedBean>> _connections = new ConcurrentHashMap<String,List<ManagedBean>>(); // list of all exchange mbeans - protected List<ManagedBean> _exchanges = new ArrayList<ManagedBean>(); + protected ConcurrentMap<String,List<ManagedBean>> _exchanges = new ConcurrentHashMap<String,List<ManagedBean>>(); // list of all queue mbenas - protected List<ManagedBean> _queues = new ArrayList<ManagedBean>(); + protected ConcurrentMap<String,List<ManagedBean>> _queues = new ConcurrentHashMap<String,List<ManagedBean>>(); public ServerRegistry() { @@ -61,47 +65,68 @@ public abstract class ServerRegistry protected void addConnectionMBean(ManagedBean mbean) { - _connections.add(mbean); + String virtualHostName = mbean.getProperty("VirtualHost"); + _connections.putIfAbsent(virtualHostName, new ArrayList<ManagedBean>()); + List<ManagedBean> beans = _connections.get(virtualHostName); + beans.add(mbean); } protected void addExchangeMBean(ManagedBean mbean) { - _exchanges.add(mbean); + String virtualHostName = mbean.getProperty("VirtualHost"); + _exchanges.putIfAbsent(virtualHostName, new ArrayList<ManagedBean>()); + List<ManagedBean> beans = _exchanges.get(virtualHostName); + beans.add(mbean); } protected void addQueueMBean(ManagedBean mbean) { - _queues.add(mbean); + String virtualHostName = mbean.getProperty("VirtualHost"); + _queues.putIfAbsent(virtualHostName, new ArrayList<ManagedBean>()); + List<ManagedBean> beans = _queues.get(virtualHostName); + beans.add(mbean); } protected void removeConnectionMBean(ManagedBean mbean) { - _connections.remove(mbean); + String virtualHostName = mbean.getProperty("VirtualHost"); + _connections.putIfAbsent(virtualHostName, new ArrayList<ManagedBean>()); + List<ManagedBean> beans = _connections.get(virtualHostName); + beans.remove(mbean); } protected void removeExchangeMBean(ManagedBean mbean) { - _exchanges.remove(mbean); + String virtualHostName = mbean.getProperty("VirtualHost"); + _exchanges.putIfAbsent(virtualHostName, new ArrayList<ManagedBean>()); + List<ManagedBean> beans = _exchanges.get(virtualHostName); + beans.remove(mbean); } protected void removeQueueMBean(ManagedBean mbean) { - _queues.remove(mbean); + String virtualHostName = mbean.getProperty("VirtualHost"); + _queues.putIfAbsent(virtualHostName, new ArrayList<ManagedBean>()); + List<ManagedBean> beans = _queues.get(virtualHostName); + beans.remove(mbean); } - public List<ManagedBean> getConnections() + public List<ManagedBean> getConnections(String virtualHost) { - return _connections; + _connections.putIfAbsent(virtualHost, new ArrayList<ManagedBean>()); + return _connections.get(virtualHost); } - public List<ManagedBean> getExchanges() + public List<ManagedBean> getExchanges(String virtualHost) { - return _exchanges; + _exchanges.putIfAbsent(virtualHost, new ArrayList<ManagedBean>()); + return _exchanges.get(virtualHost); } - public List<ManagedBean> getQueues() + public List<ManagedBean> getQueues(String virtualHost) { - return _queues; + _queues.putIfAbsent(virtualHost, new ArrayList<ManagedBean>()); + return _queues.get(virtualHost); } public abstract void addManagedObject(ManagedBean key); @@ -123,11 +148,11 @@ public abstract class ServerRegistry public abstract OperationDataModel getOperationModel(ManagedBean mbean); - public abstract String[] getQueueNames(); + public abstract String[] getQueueNames(String virtualHost); - public abstract String[] getExchangeNames(); + public abstract String[] getExchangeNames(String virtualHost); - public abstract String[] getConnectionNames(); + public abstract String[] getConnectionNames(String virtualHost); public abstract List<NotificationObject> getNotifications(ManagedBean mbean); diff --git a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/ClientListener.java b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/ClientListener.java index 00a9ae7653..6a23051a9e 100644 --- a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/ClientListener.java +++ b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/ClientListener.java @@ -43,8 +43,6 @@ public class ClientListener implements NotificationListener public void handleNotification(Notification notification, Object handback) { - System.out.println("\nReceived server notification: " + notification); - ObjectName objName = null; String type = notification.getType(); if (MBeanServerNotification.REGISTRATION_NOTIFICATION.equals(type)) diff --git a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/ClientNotificationListener.java b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/ClientNotificationListener.java index 31b761fcf3..c6ecda4b4c 100644 --- a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/ClientNotificationListener.java +++ b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/ClientNotificationListener.java @@ -34,7 +34,6 @@ public class ClientNotificationListener extends ClientListener public void handleNotification(Notification notification, Object handback) { - System.out.println("\nReceived mbean notification: " + notification); ObjectName objName = (ObjectName)notification.getSource(); //String type = notification.getType(); getServerRegistry().addNotification(objName, notification); diff --git a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/JMXServerRegistry.java b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/JMXServerRegistry.java index c087bd2e72..727e1228f5 100644 --- a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/JMXServerRegistry.java +++ b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/JMXServerRegistry.java @@ -120,7 +120,8 @@ public class JMXServerRegistry extends ServerRegistry } catch (ListenerNotFoundException ex) { - System.out.println(ex.toString()); + System.err.println(ex); + ex.printStackTrace(); } } @@ -131,15 +132,15 @@ public class JMXServerRegistry extends ServerRegistry public void addManagedObject(ManagedBean mbean) { - if (Constants.QUEUE.equals(mbean.getType()) && !mbean.getName().startsWith("tmp_")) + if (Constants.MBEAN_TYPE_QUEUE.equals(mbean.getType()) && !mbean.getName().startsWith("tmp_")) { addQueueMBean(mbean); } - else if (Constants.EXCHANGE.equals(mbean.getType())) + else if (Constants.MBEAN_TYPE_EXCHANGE.equals(mbean.getType())) { addExchangeMBean(mbean); } - else if (Constants.CONNECTION.equals(mbean.getType())) + else if (Constants.MBEAN_TYPE_CONNECTION.equals(mbean.getType())) { addConnectionMBean(mbean); } @@ -149,11 +150,11 @@ public class JMXServerRegistry extends ServerRegistry public void removeManagedObject(ManagedBean mbean) { - if (Constants.QUEUE.equals(mbean.getType())) + if (Constants.MBEAN_TYPE_QUEUE.equals(mbean.getType())) removeQueueMBean(mbean); - else if (Constants.EXCHANGE.equals(mbean.getType())) + else if (Constants.MBEAN_TYPE_EXCHANGE.equals(mbean.getType())) removeExchangeMBean(mbean); - else if (Constants.CONNECTION.equals(mbean.getType())) + else if (Constants.MBEAN_TYPE_CONNECTION.equals(mbean.getType())) removeConnectionMBean(mbean); _mbeansMap.remove(mbean.getUniqueName()); @@ -247,7 +248,6 @@ public class JMXServerRegistry extends ServerRegistry list.add(type); } - System.out.println("Subscribed for notification :" + mbean.getUniqueName()); } public boolean hasSubscribedForNotifications(ManagedBean mbean, String name, String type) @@ -268,7 +268,6 @@ public class JMXServerRegistry extends ServerRegistry public void removeNotificationListener(ManagedBean mbean, String name, String type) throws Exception { - System.out.println("Removed notification listener :" + mbean.getUniqueName() + name +type); if (_subscribedNotificationMap.containsKey(mbean.getUniqueName())) { HashMap<String, List<String>> map = _subscribedNotificationMap.get(mbean.getUniqueName()); @@ -335,37 +334,40 @@ public class JMXServerRegistry extends ServerRegistry return _operationModelMap.get(mbean.getUniqueName()); } - public String[] getQueueNames() + public String[] getQueueNames(String virtualHost) { - String[] queues = new String[_queues.size()]; + List<ManagedBean> queues = _queues.get(virtualHost); + String[] queueNames = new String[queues.size()]; int i = 0; - for (ManagedBean mbean : _queues) + for (ManagedBean mbean : queues) { - queues[i++] = mbean.getName(); + queueNames[i++] = mbean.getName(); } - return queues; + return queueNames; } - public String[] getExchangeNames() + public String[] getExchangeNames(String virtualHost) { - String[] exchanges = new String[_exchanges.size()]; + List<ManagedBean> exchanges = _exchanges.get(virtualHost); + String[] exchangeNames = new String[exchanges.size()]; int i = 0; - for (ManagedBean mbean : _exchanges) + for (ManagedBean mbean : exchanges) { - exchanges[i++] = mbean.getName(); + exchangeNames[i++] = mbean.getName(); } - return exchanges; + return exchangeNames; } - public String[] getConnectionNames() + public String[] getConnectionNames(String virtualHost) { - String[] connections = new String[_connections.size()]; + List<ManagedBean> connections = _connections.get(virtualHost); + String[] connectionNames = new String[connections.size()]; int i = 0; - for (ManagedBean mbean : _connections) + for (ManagedBean mbean : connections) { - connections[i++] = mbean.getName(); + connectionNames[i++] = mbean.getName(); } - return connections; + return connectionNames; } public ClientNotificationListener getNotificationListener() diff --git a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/MBeanTypeTabControl.java b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/MBeanTypeTabControl.java index 73d56634ec..d8d76058a5 100644 --- a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/MBeanTypeTabControl.java +++ b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/MBeanTypeTabControl.java @@ -91,7 +91,7 @@ public class MBeanTypeTabControl for (int i = 0; i < selectedItems.length; i++) { String name = selectedItems[i];; - if (Constants.QUEUE.equals(_type)) + if (Constants.MBEAN_TYPE_QUEUE.equals(_type)) { int endIndex = name.lastIndexOf("("); name = name.substring(0, endIndex -1); @@ -231,21 +231,21 @@ public class MBeanTypeTabControl java.util.List<ManagedBean> list = null; // populate the map and list with appropriate mbeans - if (_type.equals(Constants.QUEUE)) + if (_type.equals(Constants.MBEAN_TYPE_QUEUE) || _type.equals(Constants.NODE_LABEL_QUEUES)) { - list = serverRegistry.getQueues(); + list = serverRegistry.getQueues(MBeanView.getVirtualHostName()); items = getQueueItems(list); _sortBySizeButton.setVisible(true); } - else if (_type.equals(Constants.EXCHANGE)) + else if (_type.equals(Constants.MBEAN_TYPE_EXCHANGE) || _type.equals(Constants.NODE_LABEL_EXCHANGES)) { - list = serverRegistry.getExchanges(); + list = serverRegistry.getExchanges(MBeanView.getVirtualHostName()); items = getItems(list); _sortBySizeButton.setVisible(false); } - else if (_type.equals(Constants.CONNECTION)) + else if (_type.equals(Constants.MBEAN_TYPE_CONNECTION) || _type.equals(Constants.NODE_LABEL_CONNECTIONS)) { - list = serverRegistry.getConnections(); + list = serverRegistry.getConnections(MBeanView.getVirtualHostName()); items = getItems(list); _sortBySizeButton.setVisible(false); } diff --git a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/MBeanView.java b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/MBeanView.java index 62871c4c91..1622c231c6 100644 --- a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/MBeanView.java +++ b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/MBeanView.java @@ -74,9 +74,12 @@ public class MBeanView extends ViewPart // TabFolder to list all the mbeans for a given mbeantype(eg Connection, Queue, Exchange) private TabFolder typeTabFolder = null; + private static String _virtualHostName; + + /* - * Listener for the selection events in the navigation view - */ + * Listener for the selection events in the navigation view + */ private class SelectionListenerImpl implements ISelectionListener { public void selectionChanged(IWorkbenchPart part, ISelection sel) @@ -102,11 +105,11 @@ public class MBeanView extends ViewPart setServer(); try { - if (Constants.TYPE.equals(_selectedNode.getType())) + if (Constants.NODE_TYPE_MBEANTYPE.equals(_selectedNode.getType())) { refreshTypeTabFolder(_selectedNode.getName()); } - else if (Constants.DOMAIN.equals(_selectedNode.getType())) + else if (Constants.NODE_TYPE_VIRTUAL_HOST.equals(_selectedNode.getType())) { refreshTypeTabFolder(typeTabFolder.getItem(0)); } @@ -146,6 +149,19 @@ public class MBeanView extends ViewPart if (parent != null && parent.getType().equals(Constants.SERVER)) _server = (ManagedServer)parent.getManagedObject(); } + + TreeObject parent = _selectedNode; + while (parent != null && !parent.getType().equals(Constants.NODE_TYPE_VIRTUAL_HOST)) + { + parent = parent.getParent(); + } + + if (parent != null) + { + _virtualHostName = parent.getName().substring(1, parent.getName().length()-1); + } + + } public static ManagedServer getServer() @@ -427,15 +443,15 @@ public class MBeanView extends ViewPart typeTabFolder.setData("CONTROLLER", controller); TabItem tab = new TabItem(typeTabFolder, SWT.NONE); - tab.setText(Constants.CONNECTION); + tab.setText(Constants.TAB_LABEL_CONNECTIONS); tab.setControl(controller.getControl()); tab = new TabItem(typeTabFolder, SWT.NONE); - tab.setText(Constants.EXCHANGE); + tab.setText(Constants.TAB_LABEL_EXCHANGES); tab.setControl(controller.getControl()); tab = new TabItem(typeTabFolder, SWT.NONE); - tab.setText(Constants.QUEUE); + tab.setText(Constants.TAB_LABEL_QUEUES); tab.setControl(controller.getControl()); typeTabFolder.addListener(SWT.Selection, new Listener() @@ -469,21 +485,30 @@ public class MBeanView extends ViewPart } typeTabFolder.setSelection(tab); MBeanTypeTabControl controller = (MBeanTypeTabControl)typeTabFolder.getData("CONTROLLER"); - controller.refresh(tab.getText()); + String nodeType = Constants.NODE_LABEL_CONNECTIONS; + if(tab.getText().equals(Constants.TAB_LABEL_QUEUES)) + { + nodeType = Constants.NODE_LABEL_QUEUES; + } + else if(tab.getText().equals(Constants.TAB_LABEL_EXCHANGES)) + { + nodeType = Constants.NODE_LABEL_EXCHANGES; + } + controller.refresh(nodeType); typeTabFolder.setVisible(true); } - private void refreshTypeTabFolder(String type) throws Exception + private void refreshTypeTabFolder(String name) throws Exception { - if (Constants.CONNECTION.equals(type)) + if (Constants.NODE_LABEL_CONNECTIONS.equals(name)) { refreshTypeTabFolder(typeTabFolder.getItem(0)); } - else if (Constants.EXCHANGE.equals(type)) + else if (Constants.NODE_LABEL_EXCHANGES.equals(name)) { refreshTypeTabFolder(typeTabFolder.getItem(1)); } - else if (Constants.QUEUE.equals(type)) + else if (Constants.NODE_LABEL_QUEUES.equals(name)) { refreshTypeTabFolder(typeTabFolder.getItem(2)); } @@ -516,5 +541,11 @@ public class MBeanView extends ViewPart typeTabFolder.setVisible(false); } } + + public static String getVirtualHostName() + { + return _virtualHostName; + + } } diff --git a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/NavigationView.java b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/NavigationView.java index ef74f0c230..fd53aa31df 100644 --- a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/NavigationView.java +++ b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/NavigationView.java @@ -29,6 +29,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.qpid.management.ui.ApplicationRegistry; import org.apache.qpid.management.ui.Constants; @@ -39,6 +40,7 @@ import org.apache.qpid.management.ui.exceptions.InfoRequiredException; import org.apache.qpid.management.ui.exceptions.ManagementConsoleException; import org.apache.qpid.management.ui.jmx.JMXServerRegistry; import org.apache.qpid.management.ui.jmx.MBeanUtility; +import org.apache.qpid.management.ui.jmx.JMXManagedObject; import org.eclipse.jface.viewers.DoubleClickEvent; import org.eclipse.jface.viewers.IDoubleClickListener; import org.eclipse.jface.viewers.IFontProvider; @@ -266,7 +268,8 @@ public class NavigationView extends ViewPart catch(Exception ex) { System.out.println("\nError in connecting to Qpid broker "); - System.out.println("\n" + ex.toString()); + System.out.println("\n" + ex); + ex.printStackTrace(); } } @@ -284,27 +287,66 @@ public class NavigationView extends ViewPart // Add these three types - Connection, Exchange, Queue // By adding these, these will always be available, even if there are no mbeans under thse types // This is required because, the mbeans will be added from mbeanview, by selecting from the list - TreeObject typeChild = new TreeObject(Constants.CONNECTION, Constants.TYPE); - typeChild.setParent(domain); - typeChild = new TreeObject(Constants.EXCHANGE, Constants.TYPE); - typeChild.setParent(domain); - typeChild = new TreeObject(Constants.QUEUE, Constants.TYPE); - typeChild.setParent(domain); - - + + + TreeObject virtualhosts = new TreeObject(Constants.NODE_LABEL_VIRTUAL_HOSTS, Constants.NODE_TYPE_MBEANTYPE); + virtualhosts.setParent(domain); + + Map<String, TreeObject> virtualHostMap = new HashMap<String, TreeObject>(); + // Now populate the mbenas under those types List<ManagedBean> mbeans = MBeanUtility.getManagedObjectsForDomain(server, domain.getName()); for (ManagedBean mbean : mbeans) { + + if(mbean.getType().equals(Constants.VIRTUAL_HOST)) + { + TreeObject host = new TreeObject("[" + mbean.getName() + "]", Constants.NODE_TYPE_VIRTUAL_HOST); + + virtualHostMap.put(mbean.getName(), host); + host.setParent(virtualhosts); + + TreeObject child = new TreeObject(Constants.NODE_LABEL_CONNECTIONS, Constants.NODE_TYPE_MBEANTYPE); + + child.setParent(host); + child = new TreeObject(Constants.NODE_LABEL_EXCHANGES, Constants.NODE_TYPE_MBEANTYPE); + child.setParent(host); + child = new TreeObject(Constants.NODE_LABEL_QUEUES, Constants.NODE_TYPE_MBEANTYPE); + child.setParent(host); + + } + } + for (ManagedBean mbean : mbeans) + { + + mbean.setServer(server); - ServerRegistry serverRegistry = ApplicationRegistry.getServerRegistry(server); - serverRegistry.addManagedObject(mbean); + ServerRegistry serverRegistry = ApplicationRegistry.getServerRegistry(server); + serverRegistry.addManagedObject(mbean); + + if (mbean.getType().equals(Constants.MBEAN_TYPE_BROKER_MANAGER)) + { + JMXManagedObject obj = (JMXManagedObject) mbean; + String host = obj.getProperty("VirtualHost"); + TreeObject node = virtualHostMap.get(host); + if(node != null) + { + TreeObject beanNode = new TreeObject(mbean); + beanNode.setParent(node); + } + } + + // Add all mbeans other than Connections, Exchanges and Queues. Because these will be added // manually by selecting from MBeanView - if (!(mbean.getType().equals(Constants.CONNECTION) || mbean.getType().equals(Constants.EXCHANGE) || mbean.getType().equals(Constants.QUEUE))) + if (!(mbean.getType().equals(Constants.MBEAN_TYPE_CONNECTION) + || mbean.getType().equals(Constants.MBEAN_TYPE_EXCHANGE) + || mbean.getType().equals(Constants.MBEAN_TYPE_QUEUE) + || mbean.getType().equals(Constants.VIRTUAL_HOST) + || mbean.getType().equals(Constants.MBEAN_TYPE_BROKER_MANAGER))) { - addManagedBean(domain, mbean); + addManagedBean(domain, mbean, virtualHostMap); } } } @@ -322,7 +364,7 @@ public class NavigationView extends ViewPart for (TreeObject child : childNodes) { - if (Constants.TYPE.equals(child.getType()) && typeName.equals(child.getName())) + if (Constants.NODE_TYPE_MBEANTYPE.equals(child.getType()) && typeName.equals(child.getName())) return child; } return null; @@ -343,13 +385,22 @@ public class NavigationView extends ViewPart * Adds the given MBean to the given domain node. Creates Notification node for the MBean. * @param domain * @param mbean mbean + * @param virtualHostMap */ - private void addManagedBean(TreeObject domain, ManagedBean mbean) throws Exception + private void addManagedBean(TreeObject domain, ManagedBean mbean, Map<String, TreeObject> virtualHostMap) throws Exception { + JMXManagedObject obj = (JMXManagedObject) mbean; + + + String type = mbean.getType(); String name = mbean.getName(); - TreeObject typeNode = getMBeanTypeNode(domain, type); + String virtualHostName = obj.getProperty("VirtualHost"); + + TreeObject virtualHostNode = virtualHostMap.get(virtualHostName); + + TreeObject typeNode = getMBeanTypeNode(virtualHostNode, getNodeLabelForType(type)); if (typeNode != null && doesMBeanNodeAlreadyExist(typeNode, name)) return; @@ -368,8 +419,8 @@ public class NavigationView extends ViewPart // type node does not exist. Now check if node to be created as mbeantype or MBean if (name != null) // A managedObject with type and name { - typeNode = new TreeObject(type, Constants.TYPE); - typeNode.setParent(domain); + typeNode = new TreeObject(type, Constants.NODE_TYPE_MBEANTYPE); + typeNode.setParent(virtualHostNode); mbeanNode = new TreeObject(mbean); mbeanNode.setParent(typeNode); } @@ -385,7 +436,27 @@ public class NavigationView extends ViewPart TreeObject notificationNode = new TreeObject(Constants.NOTIFICATION, Constants.NOTIFICATION); notificationNode.setParent(mbeanNode); } - + + private String getNodeLabelForType(String type) + { + if(type.equals(Constants.MBEAN_TYPE_EXCHANGE)) + { + return Constants.NODE_LABEL_EXCHANGES; + } + else if(type.equals(Constants.MBEAN_TYPE_QUEUE)) + { + return Constants.NODE_LABEL_QUEUES; + } + else if(type.equals(Constants.MBEAN_TYPE_CONNECTION)) + { + return Constants.NODE_LABEL_CONNECTIONS; + } + else + { + return type; + } + } + /** * Removes all the child nodes of the given parent node * @param parent @@ -750,8 +821,26 @@ public class NavigationView extends ViewPart break; } } + for (TreeObject child : domain.getChildren()) + { + if (child.getName().equals(Constants.NODE_LABEL_VIRTUAL_HOSTS)) + { + domain = child; + break; + } + } + Map<String, TreeObject> hostMap = new HashMap<String,TreeObject>(); + + for (TreeObject child: domain.getChildren()) + { + + if(child.getType().equals(Constants.NODE_TYPE_VIRTUAL_HOST)) + { + hostMap.put(child.getName().substring(1,child.getName().length()-1), child); + } + } - addManagedBean(domain, mbean); + addManagedBean(domain, mbean, hostMap); _treeViewer.refresh(); } @@ -773,7 +862,6 @@ public class NavigationView extends ViewPart { for (ManagedBean mbean : removalList) { - System.out.println("removing " + mbean.getName() + " " + mbean.getType()); TreeObject treeServerObject = _managedServerMap.get(mbean.getServer()); List<TreeObject> domains = treeServerObject.getChildren(); TreeObject domain = null; diff --git a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/NotificationsTabControl.java b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/NotificationsTabControl.java index 3eb93f55d3..c45ad7b362 100644 --- a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/NotificationsTabControl.java +++ b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/NotificationsTabControl.java @@ -326,7 +326,6 @@ public class NotificationsTabControl extends TabControl Shell shell = null; public void doubleClick(DoubleClickEvent event) { - System.out.println("DoubleClickEvent" + event); display = Display.getCurrent(); shell = new Shell(display, SWT.BORDER | SWT.CLOSE | SWT.MIN | SWT.MAX | SWT.RESIZE); diff --git a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/OperationTabControl.java b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/OperationTabControl.java index b0c67d03fe..204dd6f674 100644 --- a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/OperationTabControl.java +++ b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/OperationTabControl.java @@ -216,7 +216,7 @@ public class OperationTabControl extends TabControl } // Customised parameter widgets - if (_mbean.getType().equals(Constants.EXCHANGE) && + if (_mbean.getType().equals(Constants.MBEAN_TYPE_EXCHANGE) && "headers".equals(_mbean.getProperty(Constants.EXCHANGE_TYPE)) && _opData.getName().equalsIgnoreCase("createNewBinding")) { @@ -241,10 +241,10 @@ public class OperationTabControl extends TabControl formData.top = new FormAttachment(0, params.indexOf(param) * heightForAParameter); formData.left = new FormAttachment(label, 5); formData.right = new FormAttachment(valueNumerator); - if (param.getName().equals(Constants.QUEUE)) + if (param.getName().equals(Constants.MBEAN_TYPE_QUEUE)) { Combo combo = new Combo(_paramsComposite, SWT.READ_ONLY | SWT.DROP_DOWN); - String[] items = ApplicationRegistry.getServerRegistry(_mbean).getQueueNames(); + String[] items = ApplicationRegistry.getServerRegistry(_mbean).getQueueNames(_mbean.getProperty("VirtualHost")); combo.setItems(items); combo.add("Select Queue", 0); combo.select(0); @@ -253,10 +253,10 @@ public class OperationTabControl extends TabControl combo.addSelectionListener(parameterSelectionListener); valueInCombo = true; } - else if (param.getName().equals(Constants.EXCHANGE)) + else if (param.getName().equals(Constants.MBEAN_TYPE_EXCHANGE)) { Combo combo = new Combo(_paramsComposite, SWT.READ_ONLY | SWT.DROP_DOWN); - String[] items = ApplicationRegistry.getServerRegistry(_mbean).getExchangeNames(); + String[] items = ApplicationRegistry.getServerRegistry(_mbean).getExchangeNames(_mbean.getProperty("VirtualHost")); combo.setItems(items); combo.add("Select Exchange", 0); combo.select(0); @@ -358,7 +358,7 @@ public class OperationTabControl extends TabControl formData.right = new FormAttachment(valueNumerator); Combo combo = new Combo(composite, SWT.READ_ONLY | SWT.DROP_DOWN); - String[] items = ApplicationRegistry.getServerRegistry(_mbean).getQueueNames(); + String[] items = ApplicationRegistry.getServerRegistry(_mbean).getQueueNames(_mbean.getProperty("VirtualHost")); combo.setItems(items); combo.add("Select Queue", 0); combo.select(0); diff --git a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/ViewUtility.java b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/ViewUtility.java index c2066a9277..c21be5d68c 100644 --- a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/ViewUtility.java +++ b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/ViewUtility.java @@ -277,7 +277,6 @@ public class ViewUtility // Set the index being shown. compositeHolder.setData(INDEX, index); - System.out.println("index :" + index); return (CompositeData)((Map.Entry)objectData.get(index)).getValue(); } @@ -320,7 +319,6 @@ public class ViewUtility if (itemType.isArray()) { OpenType type = ((ArrayType)itemType).getElementOpenType(); - System.out.println("Array Element type = " + type.getClassName()); // If Byte array and mimetype is text, convert to text string if (type.getClassName().equals(Byte.class.getName())) { @@ -390,7 +388,6 @@ public class ViewUtility try { String textMessage = new String(byteArray, encoding); - System.out.println("\nMessage : \n" + textMessage + "\n"); Text valueText = toolkit.createText(compositeHolder, textMessage, SWT.READ_ONLY | SWT.BORDER | SWT.MULTI | SWT.WRAP | SWT.V_SCROLL); diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java index cbdd498b37..57512929c1 100644 --- a/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java +++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java @@ -104,7 +104,7 @@ public class ServiceRequestingClient implements ExceptionListener m.getPropertyNames(); if (m.propertyExists("timeSent")) { - long timeSent = Long.parseLong(m.getStringProperty("timeSent")); + long timeSent = m.getLongProperty("timeSent"); if (_averageLatency == 0) { _averageLatency = _messageReceivedTime - timeSent; diff --git a/java/systests/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java b/java/systests/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java index ec27b8a191..62dc44e23f 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java @@ -23,6 +23,7 @@ import org.apache.qpid.server.management.ManagedBroker; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.framing.AMQShortString; public class AMQBrokerManagerMBeanTest extends TestCase @@ -40,7 +41,9 @@ public class AMQBrokerManagerMBeanTest extends TestCase assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange2)) == null); assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange3)) == null); - ManagedBroker mbean = new AMQBrokerManagerMBean(); + VirtualHost vHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"); + + ManagedBroker mbean = new AMQBrokerManagerMBean((VirtualHost.VirtualHostMBean)vHost.getManagedObject()); mbean.createNewExchange(exchange1,"direct",false, false); mbean.createNewExchange(exchange2,"topic",false, false); mbean.createNewExchange(exchange3,"headers",false, false); @@ -61,7 +64,9 @@ public class AMQBrokerManagerMBeanTest extends TestCase public void testQueueOperations() throws Exception { String queueName = "testQueue_" + System.currentTimeMillis(); - ManagedBroker mbean = new AMQBrokerManagerMBean(); + VirtualHost vHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"); + + ManagedBroker mbean = new AMQBrokerManagerMBean((VirtualHost.VirtualHostMBean)vHost.getManagedObject()); assertTrue(_queueRegistry.getQueue(new AMQShortString(queueName)) == null); @@ -77,7 +82,7 @@ public class AMQBrokerManagerMBeanTest extends TestCase { super.setUp(); IApplicationRegistry appRegistry = ApplicationRegistry.getInstance(); - _queueRegistry = appRegistry.getQueueRegistry(); - _exchangeRegistry = appRegistry.getExchangeRegistry(); + _queueRegistry = appRegistry.getVirtualHostRegistry().getVirtualHost("test").getQueueRegistry(); + _exchangeRegistry = appRegistry.getVirtualHostRegistry().getVirtualHost("test").getExchangeRegistry(); } } diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index ea576a5661..84506f4f48 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -178,7 +178,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase public TestQueue(AMQShortString name) throws AMQException { - super(name, false, new AMQShortString("test"), true, ApplicationRegistry.getInstance().getQueueRegistry()); + super(name, false, new AMQShortString("test"), true, ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test")); } /** diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java index 39c47118da..9653155a51 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java @@ -21,7 +21,9 @@ import junit.framework.TestCase; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.management.ManagedObject; +import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; @@ -36,6 +38,7 @@ public class ExchangeMBeanTest extends TestCase { private AMQQueue _queue; private QueueRegistry _queueRegistry; + private VirtualHost _virtualHost; /** * Test for direct exchange mbean @@ -45,7 +48,7 @@ public class ExchangeMBeanTest extends TestCase public void testDirectExchangeMBean() throws Exception { DestNameExchange exchange = new DestNameExchange(); - exchange.initialise(ExchangeDefaults.DIRECT_EXCHANGE_NAME, false, 0, true); + exchange.initialise(_virtualHost, ExchangeDefaults.DIRECT_EXCHANGE_NAME, false, 0, true); ManagedObject managedObj = exchange.getManagedObject(); ManagedExchange mbean = (ManagedExchange)managedObj; @@ -72,7 +75,7 @@ public class ExchangeMBeanTest extends TestCase public void testTopicExchangeMBean() throws Exception { DestWildExchange exchange = new DestWildExchange(); - exchange.initialise(ExchangeDefaults.TOPIC_EXCHANGE_NAME, false, 0, true); + exchange.initialise(_virtualHost,ExchangeDefaults.TOPIC_EXCHANGE_NAME, false, 0, true); ManagedObject managedObj = exchange.getManagedObject(); ManagedExchange mbean = (ManagedExchange)managedObj; @@ -99,7 +102,7 @@ public class ExchangeMBeanTest extends TestCase public void testHeadersExchangeMBean() throws Exception { HeadersExchange exchange = new HeadersExchange(); - exchange.initialise(ExchangeDefaults.HEADERS_EXCHANGE_NAME, false, 0, true); + exchange.initialise(_virtualHost,ExchangeDefaults.HEADERS_EXCHANGE_NAME, false, 0, true); ManagedObject managedObj = exchange.getManagedObject(); ManagedExchange mbean = (ManagedExchange)managedObj; @@ -122,8 +125,11 @@ public class ExchangeMBeanTest extends TestCase protected void setUp() throws Exception { super.setUp(); - _queueRegistry = ApplicationRegistry.getInstance().getQueueRegistry(); - _queue = new AMQQueue(new AMQShortString("testQueue"), false, new AMQShortString("ExchangeMBeanTest"), false, _queueRegistry); + + IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance(); + _virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test"); + _queueRegistry = _virtualHost.getQueueRegistry(); + _queue = new AMQQueue(new AMQShortString("testQueue"), false, new AMQShortString("ExchangeMBeanTest"), false, _virtualHost); _queueRegistry.registerQueue(_queue); } } diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java index c01241d11d..70da7d1692 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.exchange; import org.apache.qpid.AMQException; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.util.TestApplicationRegistry; +import org.apache.qpid.server.util.NullApplicationRegistry; import org.apache.qpid.framing.BasicPublishBody; public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase @@ -30,7 +31,7 @@ public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase protected void setUp() throws Exception { super.setUp(); - ApplicationRegistry.initialise(new TestApplicationRegistry()); + ApplicationRegistry.initialise(new NullApplicationRegistry()); } public void testSimple() throws AMQException diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java index 546c61eda0..c8271f1549 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java @@ -4,6 +4,7 @@ import junit.framework.TestCase; import org.apache.log4j.Logger;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.util.TestApplicationRegistry;
+import org.apache.qpid.server.util.NullApplicationRegistry;
import org.apache.qpid.client.*;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.url.AMQBindingURL;
@@ -38,7 +39,7 @@ public class ReturnUnroutableMandatoryMessageTest extends TestCase implements Ex {
super.setUp();
TransportConnection.createVMBroker(1);
- ApplicationRegistry.initialise(new TestApplicationRegistry(), 1);
+ ApplicationRegistry.initialise(new NullApplicationRegistry(), 1);
}
protected void tearDown() throws Exception
diff --git a/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java b/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java index 594330b945..dac0f06744 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java @@ -21,6 +21,9 @@ import junit.framework.TestCase; import org.apache.mina.common.IoSession; import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.exchange.DefaultExchangeFactory; import org.apache.qpid.server.exchange.DefaultExchangeRegistry; import org.apache.qpid.server.exchange.ExchangeRegistry; @@ -46,6 +49,7 @@ public class AMQProtocolSessionMBeanTest extends TestCase private QueueRegistry _queueRegistry; private ExchangeRegistry _exchangeRegistry; private AMQProtocolSessionMBean _mbean; + private VirtualHost _virtualHost; public void testChannels() throws Exception { @@ -53,7 +57,7 @@ public class AMQProtocolSessionMBeanTest extends TestCase int channelCount = _mbean.channels().size(); assertTrue(channelCount == 1); AMQQueue queue = new org.apache.qpid.server.queue.AMQQueue(new AMQShortString("testQueue_" + System.currentTimeMillis()), - false, new AMQShortString("test"), true, _queueRegistry); + false, new AMQShortString("test"), true, _virtualHost); AMQChannel channel = new AMQChannel(2, _messageStore, null); channel.setDefaultQueue(queue); _protocolSession.addChannel(channel); @@ -106,10 +110,12 @@ public class AMQProtocolSessionMBeanTest extends TestCase { super.setUp(); _channel = new AMQChannel(1, _messageStore, null); - _queueRegistry = new DefaultQueueRegistry(); - _exchangeRegistry = new DefaultExchangeRegistry(new DefaultExchangeFactory()); + IApplicationRegistry appRegistry = ApplicationRegistry.getInstance(); + _virtualHost = appRegistry.getVirtualHostRegistry().getVirtualHost("test"); + _queueRegistry = _virtualHost.getQueueRegistry(); + _exchangeRegistry = _virtualHost.getExchangeRegistry(); _mockIOSession = new MockIoSession(); - _protocolSession = new AMQMinaProtocolSession(_mockIOSession, _queueRegistry, _exchangeRegistry, new AMQCodecFactory(true)); + _protocolSession = new AMQMinaProtocolSession(_mockIOSession, appRegistry.getVirtualHostRegistry(), new AMQCodecFactory(true)); _protocolSession.addChannel(_channel); _mbean = (AMQProtocolSessionMBean)_protocolSession.getManagedObject(); } diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index 64492e3d67..ba60105824 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -24,6 +24,9 @@ import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.RequiredDeliveryException; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.store.MessageStore; @@ -50,6 +53,7 @@ public class AMQQueueMBeanTest extends TestCase new HashSet<Long>()); private MockProtocolSession _protocolSession; private AMQChannel _channel; + private VirtualHost _virtualHost; public void testMessageCount() throws Exception { @@ -180,8 +184,10 @@ public class AMQQueueMBeanTest extends TestCase protected void setUp() throws Exception { super.setUp(); - _queueRegistry = new DefaultQueueRegistry(); - _queue = new AMQQueue(new AMQShortString("testQueue"), false, new AMQShortString("AMQueueMBeanTest"), false, _queueRegistry); + IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance(); + _virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test"); + _queueRegistry = _virtualHost.getQueueRegistry(); + _queue = new AMQQueue(new AMQShortString("testQueue"), false, new AMQShortString("AMQueueMBeanTest"), false, _virtualHost); _queueMBean = new AMQQueueMBean(_queue); } diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java index d10d5acdd0..2ec4eab74e 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java @@ -37,6 +37,7 @@ import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.util.TestApplicationRegistry; +import org.apache.qpid.server.util.NullApplicationRegistry; import java.util.LinkedList; import java.util.Set; @@ -67,7 +68,7 @@ public class AckTest extends TestCase public AckTest() throws Exception { - ApplicationRegistry.initialise(new TestApplicationRegistry()); + ApplicationRegistry.initialise(new NullApplicationRegistry()); } protected void setUp() throws Exception @@ -78,7 +79,7 @@ public class AckTest extends TestCase _protocolSession = new MockProtocolSession(_messageStore); _protocolSession.addChannel(_channel); _subscriptionManager = new SubscriptionSet(); - _queue = new AMQQueue(new AMQShortString("myQ"), false, new AMQShortString("guest"), true, new DefaultQueueRegistry(), _subscriptionManager); + _queue = new AMQQueue(new AMQShortString("myQ"), false, new AMQShortString("guest"), true, ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"), _subscriptionManager); } private void publishMessages(int count) throws AMQException diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java index f090f431c3..6f3d42d090 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java @@ -23,6 +23,9 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.handler.OnCurrentThreadExecutor; +import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.*; import java.util.concurrent.Executor; @@ -50,11 +53,15 @@ public class ConcurrencyTest extends MessageTestHelper private boolean isComplete; private boolean failed; + private VirtualHost _virtualHost; public ConcurrencyTest() throws Exception { + + IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance(); + _virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test"); _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscriptionMgr, new AMQQueue(new AMQShortString("myQ"), false, new AMQShortString("guest"), false, - new DefaultQueueRegistry())); + _virtualHost)); } public void testConcurrent1() throws InterruptedException, AMQException diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java b/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java index 1fb2a1024f..3f371161c6 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java @@ -25,6 +25,7 @@ import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.store.MessageStore; @@ -137,4 +138,24 @@ public class MockProtocolSession implements AMQProtocolSession { return null; } + + public VirtualHost getVirtualHost() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public void setVirtualHost(VirtualHost virtualHost) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void addSessionCloseTask(Task task) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void removeSessionCloseTask(Task task) + { + //To change body of implemented methods use File | Settings | File Templates. + } } diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java index 34f70bd2db..385b5b598a 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java +++ b/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java @@ -27,6 +27,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MessageMetaData; import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.List; import java.util.concurrent.atomic.AtomicLong; @@ -42,9 +43,10 @@ public class SkeletonMessageStore implements MessageStore public void configure(String base, Configuration config) throws Exception { } - - public void configure(QueueRegistry queueRegistry, String base, Configuration config) throws Exception + + public void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception { + //To change body of implemented methods use File | Settings | File Templates. } public void close() throws Exception diff --git a/java/systests/src/main/java/org/apache/qpid/server/util/TestApplicationRegistry.java b/java/systests/src/main/java/org/apache/qpid/server/util/TestApplicationRegistry.java index f801daf27c..849285e6d6 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/util/TestApplicationRegistry.java +++ b/java/systests/src/main/java/org/apache/qpid/server/util/TestApplicationRegistry.java @@ -29,14 +29,18 @@ import org.apache.qpid.server.management.NoopManagedObjectRegistry; import org.apache.qpid.server.queue.DefaultQueueRegistry; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.security.auth.AuthenticationManager; import org.apache.qpid.server.security.auth.NullAuthenticationManager; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.MapConfiguration; import java.util.HashMap; +import java.util.Collection; public class TestApplicationRegistry extends ApplicationRegistry { @@ -51,6 +55,7 @@ public class TestApplicationRegistry extends ApplicationRegistry private AuthenticationManager _authenticationManager; private MessageStore _messageStore; + private VirtualHost _vHost; public TestApplicationRegistry() { @@ -59,10 +64,12 @@ public class TestApplicationRegistry extends ApplicationRegistry public void initialise() throws Exception { - _managedObjectRegistry = new NoopManagedObjectRegistry(); - _queueRegistry = new DefaultQueueRegistry(); - _exchangeFactory = new DefaultExchangeFactory(); - _exchangeRegistry = new DefaultExchangeRegistry(_exchangeFactory); + IApplicationRegistry appRegistry = ApplicationRegistry.getInstance(); + _managedObjectRegistry = appRegistry.getManagedObjectRegistry(); + _vHost = appRegistry.getVirtualHostRegistry().getVirtualHost("test"); + _queueRegistry = _vHost.getQueueRegistry(); + _exchangeFactory = _vHost.getExchangeFactory(); + _exchangeRegistry = _vHost.getExchangeRegistry(); _authenticationManager = new NullAuthenticationManager(); _messageStore = new TestableMemoryMessageStore(); @@ -99,6 +106,16 @@ public class TestApplicationRegistry extends ApplicationRegistry return _authenticationManager; } + public Collection<String> getVirtualHostNames() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public VirtualHostRegistry getVirtualHostRegistry() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + public MessageStore getMessageStore() { return _messageStore; |