diff options
Diffstat (limited to 'java/broker/src/main')
59 files changed, 911 insertions, 423 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java index 6f93a14469..2e6293081d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java @@ -30,6 +30,7 @@ import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; @@ -50,20 +51,28 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr private final ExchangeFactory _exchangeFactory; private final MessageStore _messageStore; + private final VirtualHost.VirtualHostMBean _virtualHostMBean; + @MBeanConstructor("Creates the Broker Manager MBean") - public AMQBrokerManagerMBean() throws JMException + public AMQBrokerManagerMBean(VirtualHost.VirtualHostMBean virtualHostMBean) throws JMException { super(ManagedBroker.class, ManagedBroker.TYPE); - IApplicationRegistry appRegistry = ApplicationRegistry.getInstance(); - _queueRegistry = appRegistry.getQueueRegistry(); - _exchangeRegistry = appRegistry.getExchangeRegistry(); - _exchangeFactory = ApplicationRegistry.getInstance().getExchangeFactory(); - _messageStore = ApplicationRegistry.getInstance().getMessageStore(); + + _virtualHostMBean = virtualHostMBean; + VirtualHost virtualHost = virtualHostMBean.getVirtualHost(); + + + + + _queueRegistry = virtualHost.getQueueRegistry(); + _exchangeRegistry = virtualHost.getExchangeRegistry(); + _messageStore = virtualHost.getMessageStore(); + _exchangeFactory = virtualHost.getExchangeFactory(); } public String getObjectInstanceName() { - return this.getClass().getName(); + return _virtualHostMBean.getVirtualHost().getName(); } /** @@ -144,7 +153,7 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr try { - queue = new AMQQueue(new AMQShortString(queueName), durable, new AMQShortString(owner), autoDelete, _queueRegistry); + queue = new AMQQueue(new AMQShortString(queueName), durable, new AMQShortString(owner), autoDelete, getVirtualHost()); if (queue.isDurable() && !queue.isAutoDelete()) { _messageStore.createQueue(queue); @@ -157,6 +166,11 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr } } + private VirtualHost getVirtualHost() + { + return _virtualHostMBean.getVirtualHost(); + } + /** * Deletes the queue from queue registry and persistant storage. * @@ -183,11 +197,17 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr } } - public ObjectName getObjectName() throws MalformedObjectNameException + public ManagedObject getParentObject() { - StringBuffer objectName = new StringBuffer(ManagedObject.DOMAIN); - objectName.append(":type=").append(getType()); - - return new ObjectName(objectName.toString()); + return _virtualHostMBean; } + +// public ObjectName getObjectName() throws MalformedObjectNameException +// { +// StringBuffer objectName = new StringBuffer(ManagedObject.DOMAIN); +// objectName.append(".").append(getVirtualHost().getName()); +// objectName.append(":type=").append(getType()); +// +// return new ObjectName(objectName.toString()); +// } } // End of MBean class diff --git a/java/broker/src/main/java/org/apache/qpid/server/Main.java b/java/broker/src/main/java/org/apache/qpid/server/Main.java index ffd25de0b4..c45d1ad2c2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/Main.java +++ b/java/broker/src/main/java/org/apache/qpid/server/Main.java @@ -62,7 +62,7 @@ import java.util.StringTokenizer; * Main entry point for AMQPD. * */ -public class Main implements ProtocolVersionList, Managable +public class Main implements ProtocolVersionList { private static final Logger _logger = Logger.getLogger(Main.class); @@ -70,7 +70,8 @@ public class Main implements ProtocolVersionList, Managable private static final String DEFAULT_LOG_CONFIG_FILENAME = "log4j.xml"; - private AMQBrokerManagerMBean _mbean = null; + + private static Main _instance; protected static class InitException extends Exception { @@ -265,7 +266,6 @@ public class Main implements ProtocolVersionList, Managable } bind(port, connectorConfig); - createAndRegisterBrokerMBean(); } protected void setupVirtualHosts(String configFileParent, String configFilePath) throws ConfigurationException, AMQException, URLSyntaxException @@ -368,7 +368,7 @@ public class Main implements ProtocolVersionList, Managable public static void main(String[] args) { - new Main(args); + _instance = new Main(args); } private byte[] parseIP(String address) throws Exception @@ -430,21 +430,4 @@ public class Main implements ProtocolVersionList, Managable } } - private void createAndRegisterBrokerMBean() throws AMQException - { - try - { - _mbean = new AMQBrokerManagerMBean(); - _mbean.register(); - } - catch (JMException ex) - { - throw new AMQException("Exception occured in creating AMQBrokerManager MBean"); - } - } - - public ManagedObject getManagedObject() - { - return _mbean; - } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/Configurator.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/Configurator.java index 5e3ac03ba7..379da94aa3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/configuration/Configurator.java +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/Configurator.java @@ -37,14 +37,15 @@ public class Configurator { private static final Logger _logger = Logger.getLogger(Configurator.class); + /** - * Configure a given instance using the application configuration. Note that superclasses are <b>not</b> + * Configure a given instance using the supplied configuration. Note that superclasses are <b>not</b> * currently configured but this could easily be added if required. * @param instance the instance to configure + * @param config the configuration to use to configure the object */ - public static void configure(Object instance) + public static void configure(Object instance, Configuration config) { - final Configuration config = ApplicationRegistry.getInstance().getConfiguration(); for (Field f : instance.getClass().getDeclaredFields()) { @@ -56,6 +57,18 @@ public class Configurator } } + + + /** + * Configure a given instance using the application configuration. Note that superclasses are <b>not</b> + * currently configured but this could easily be added if required. + * @param instance the instance to configure + */ + public static void configure(Object instance) + { + configure(instance, ApplicationRegistry.getInstance().getConfiguration()); + } + private static void setValueInField(Field f, Object instance, Configuration config, Configured annotation) { Class fieldClass = f.getType(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java index 7e807304c8..361a21b284 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java @@ -28,6 +28,7 @@ import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.log4j.Logger; @@ -113,6 +114,7 @@ public class VirtualHostConfiguration } _logger.info("VirtualHost:'" + prop + "'"); + String virtualHost = prop.toString(); prop = _config.getProperty(path + "." + XML_BIND); if (prop instanceof Collection) @@ -121,16 +123,16 @@ public class VirtualHostConfiguration _logger.debug("Number of Bindings: " + bindings); for (int dest = 0; dest < bindings; dest++) { - loadBinding(path, dest); + loadBinding(virtualHost, path, dest); } } else { - loadBinding(path, -1); + loadBinding(virtualHost,path, -1); } } - private void loadBinding(String rootpath, int index) throws AMQException, ConfigurationException, URLSyntaxException + private void loadBinding(String virtualHost, String rootpath, int index) throws AMQException, ConfigurationException, URLSyntaxException { String path = rootpath + "." + XML_BIND; if (index != -1) @@ -146,7 +148,7 @@ public class VirtualHostConfiguration try { - bind(binding); + bind(virtualHost, binding); } catch (AMQException amqe) { @@ -155,7 +157,7 @@ public class VirtualHostConfiguration } } - private void bind(AMQBindingURL binding) throws AMQException, ConfigurationException + private void bind(String virtualHostName, AMQBindingURL binding) throws AMQException, ConfigurationException { AMQShortString queueName = binding.getQueueName(); @@ -169,9 +171,10 @@ public class VirtualHostConfiguration } //Get references to Broker Registries - QueueRegistry queueRegistry = ApplicationRegistry.getInstance().getQueueRegistry(); - MessageStore messageStore = ApplicationRegistry.getInstance().getMessageStore(); - ExchangeRegistry exchangeRegistry = ApplicationRegistry.getInstance().getExchangeRegistry(); + VirtualHost virtualHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost(virtualHostName); + QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); + MessageStore messageStore = virtualHost.getMessageStore(); + ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); synchronized (queueRegistry) { @@ -184,7 +187,7 @@ public class VirtualHostConfiguration queue = new AMQQueue(queueName, Boolean.parseBoolean(binding.getOption(AMQBindingURL.OPTION_DURABLE)), null /* These queues will have no owner */, - false /* Therefore autodelete makes no sence */, queueRegistry); + false /* Therefore autodelete makes no sence */, virtualHost); if (queue.isDurable()) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index 94c792c358..caafb83568 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -25,6 +25,10 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.management.AMQManagedObject; import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.management.ManagedObject; +import org.apache.qpid.server.management.ManagedObjectRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.registry.ApplicationRegistry; import javax.management.MalformedObjectNameException; import javax.management.NotCompliantMBeanException; @@ -34,10 +38,14 @@ public abstract class AbstractExchange implements Exchange, Managable { private AMQShortString _name; + + protected boolean _durable; protected String _exchangeType; protected int _ticket; + private VirtualHost _virtualHost; + protected ExchangeMBean _exchangeMbean; /** @@ -57,6 +65,11 @@ public abstract class AbstractExchange implements Exchange, Managable super(ManagedExchange.class, ManagedExchange.TYPE); } + public ManagedObject getParentObject() + { + return _virtualHost.getManagedObject(); + } + public String getObjectInstanceName() { return _name.toString(); @@ -87,13 +100,17 @@ public abstract class AbstractExchange implements Exchange, Managable return _autoDelete; } - public ObjectName getObjectName() throws MalformedObjectNameException +// public ObjectName getObjectName() throws MalformedObjectNameException +// { +// String objNameString = super.getObjectName().toString(); +// objNameString = objNameString + ",VirtualHost="+ _virtualHost.getName() +",ExchangeType=" + _exchangeType; +// return new ObjectName(objNameString); +// } + + protected ManagedObjectRegistry getManagedObjectRegistry() { - String objNameString = super.getObjectName().toString(); - objNameString = objNameString + ",ExchangeType=" + _exchangeType; - return new ObjectName(objNameString); + return ApplicationRegistry.getInstance().getManagedObjectRegistry(); } - } // End of MBean class public AMQShortString getName() @@ -108,8 +125,9 @@ public abstract class AbstractExchange implements Exchange, Managable */ protected abstract ExchangeMBean createMBean() throws AMQException; - public void initialise(AMQShortString name, boolean durable, int ticket, boolean autoDelete) throws AMQException + public void initialise(VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete) throws AMQException { + _virtualHost = host; _name = name; _durable = durable; _autoDelete = autoDelete; @@ -151,4 +169,13 @@ public abstract class AbstractExchange implements Exchange, Managable return _exchangeMbean; } + public VirtualHost getVirtualHost() + { + return _virtualHost; + } + + public QueueRegistry getQueueRegistry() + { + return getVirtualHost().getQueueRegistry(); + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java index 77f9819048..d77f1b6c5a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java @@ -24,6 +24,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQUnknownExchangeType; +import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; @@ -36,9 +37,11 @@ public class DefaultExchangeFactory implements ExchangeFactory private static final Logger _logger = Logger.getLogger(DefaultExchangeFactory.class); private Map<AMQShortString, Class<? extends Exchange>> _exchangeClassMap = new HashMap<AMQShortString, Class<? extends Exchange>>(); + private final VirtualHost _host; - public DefaultExchangeFactory() + public DefaultExchangeFactory(VirtualHost host) { + _host = host; _exchangeClassMap.put(ExchangeDefaults.DIRECT_EXCHANGE_CLASS, org.apache.qpid.server.exchange.DestNameExchange.class); _exchangeClassMap.put(ExchangeDefaults.TOPIC_EXCHANGE_CLASS, org.apache.qpid.server.exchange.DestWildExchange.class); _exchangeClassMap.put(ExchangeDefaults.HEADERS_EXCHANGE_CLASS, org.apache.qpid.server.exchange.HeadersExchange.class); @@ -59,7 +62,7 @@ public class DefaultExchangeFactory implements ExchangeFactory try { Exchange e = exchClass.newInstance(); - e.initialise(exchange, durable, ticket, autoDelete); + e.initialise(_host, exchange, durable, ticket, autoDelete); return e; } catch (InstantiationException e) diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java index fcd6e8fdad..8862bd5104 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java @@ -109,7 +109,7 @@ public class DestNameExchange extends AbstractExchange public void createNewBinding(String queueName, String binding) throws JMException { - AMQQueue queue = ApplicationRegistry.getInstance().getQueueRegistry().getQueue(new AMQShortString(queueName)); + AMQQueue queue = getQueueRegistry().getQueue(new AMQShortString(queueName)); if (queue == null) { throw new JMException("Queue \"" + queueName + "\" is not registered with the exchange."); diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java index d1b35451b5..c38b9fe9b4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java @@ -31,6 +31,7 @@ import org.apache.qpid.server.management.MBeanDescription; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; import javax.management.JMException; import javax.management.MBeanException; @@ -110,7 +111,7 @@ public class DestWildExchange extends AbstractExchange public void createNewBinding(String queueName, String binding) throws JMException { - AMQQueue queue = ApplicationRegistry.getInstance().getQueueRegistry().getQueue(new AMQShortString(queueName)); + AMQQueue queue = getQueueRegistry().getQueue(new AMQShortString(queueName)); if (queue == null) throw new JMException("Queue \"" + queueName + "\" is not registered with the exchange."); diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java index 366dcb11b3..c012a1c1c9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java @@ -25,13 +25,14 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.virtualhost.VirtualHost; public interface Exchange { AMQShortString getName(); AMQShortString getType(); - void initialise(AMQShortString name, boolean durable, int ticket, boolean autoDelete) throws AMQException; + void initialise(VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete) throws AMQException; boolean isDurable(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java index 2e7457e4a6..82039c345f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java @@ -44,7 +44,7 @@ public class FanoutExchange extends AbstractExchange private TabularType _bindinglistDataType = null;
private TabularDataSupport _bindingList = null;
- @MBeanConstructor("Creates an MBean for AMQ direct exchange")
+ @MBeanConstructor("Creates an MBean for AMQ fanout exchange")
public FanoutExchangeMBean() throws JMException
{
super();
@@ -86,7 +86,7 @@ public class FanoutExchange extends AbstractExchange public void createNewBinding(String queueName, String binding) throws JMException
{
- AMQQueue queue = ApplicationRegistry.getInstance().getQueueRegistry().getQueue(new AMQShortString(queueName));
+ AMQQueue queue = getQueueRegistry().getQueue(new AMQShortString(queueName));
if (queue == null)
{
throw new JMException("Queue \"" + queueName + "\" is not registered with the exchange.");
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java index 93933cd88d..3a49ff586b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java @@ -150,7 +150,7 @@ public class HeadersExchange extends AbstractExchange */ public void createNewBinding(String queueName, String binding) throws JMException { - AMQQueue queue = ApplicationRegistry.getInstance().getQueueRegistry().getQueue(new AMQShortString(queueName)); + AMQQueue queue = getQueueRegistry().getQueue(new AMQShortString(queueName)); if (queue == null) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java index e12dd4a9db..b2b1b0a716 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java @@ -29,6 +29,7 @@ import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; import org.apache.log4j.Logger; public class BasicAckMethodHandler implements StateAwareMethodListener<BasicAckBody> @@ -46,10 +47,10 @@ public class BasicAckMethodHandler implements StateAwareMethodListener<BasicAckB { } - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, - AMQMethodEvent<BasicAckBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicAckBody> evt) throws AMQException { + AMQProtocolSession protocolSession = stateManager.getProtocolSession(); + if (_log.isDebugEnabled()) { _log.debug("Ack received on channel " + evt.getChannelId()); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java index a23a29941f..2bbb696e90 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java @@ -30,6 +30,7 @@ import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; import org.apache.qpid.AMQException; public class BasicCancelMethodHandler implements StateAwareMethodListener<BasicCancelBody> @@ -45,10 +46,10 @@ public class BasicCancelMethodHandler implements StateAwareMethodListener<BasicC { } - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, - AMQMethodEvent<BasicCancelBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicCancelBody> evt) throws AMQException { + AMQProtocolSession protocolSession = stateManager.getProtocolSession(); + final AMQChannel channel = protocolSession.getChannel(evt.getChannelId()); final BasicCancelBody body = evt.getMethod(); channel.unsubscribeConsumer(protocolSession, body.consumerTag); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java index 721001b454..bc695431c9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java @@ -28,6 +28,8 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.framing.*; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.ConsumerTagNotUniqueException; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; @@ -53,14 +55,15 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic { } - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession session, - AMQMethodEvent<BasicConsumeBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException { + AMQProtocolSession session = stateManager.getProtocolSession(); + BasicConsumeBody body = evt.getMethod(); final int channelId = evt.getChannelId(); AMQChannel channel = session.getChannel(channelId); + VirtualHost vHost = session.getVirtualHost(); if (channel == null) { _log.error("Channel " + channelId + " not found"); @@ -69,7 +72,7 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic else { - AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : queueRegistry.getQueue(body.queue); + AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : vHost.getQueueRegistry().getQueue(body.queue); if (queue == null) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java index 1eb3152973..51b585ecc5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java @@ -10,6 +10,8 @@ import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.ConsumerTagNotUniqueException;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInvalidSelectorException;
import org.apache.qpid.protocol.AMQConstant;
@@ -31,12 +33,13 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB {
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession session,
- AMQMethodEvent<BasicGetBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicGetBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
+
BasicGetBody body = evt.getMethod();
final int channelId = evt.getChannelId();
+ VirtualHost vHost = session.getVirtualHost();
AMQChannel channel = session.getChannel(channelId);
if (channel == null)
@@ -46,7 +49,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB }
else
{
- AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : queueRegistry.getQueue(body.queue);
+ AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : vHost.getQueueRegistry().getQueue(body.queue);
if (queue == null)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java index 0ef30be265..78a246d80e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java @@ -28,6 +28,8 @@ import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.ChannelCloseBody; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.protocol.AMQMethodEvent; @@ -53,10 +55,10 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basi { } - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, - AMQMethodEvent<BasicPublishBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicPublishBody> evt) throws AMQException { + AMQProtocolSession session = stateManager.getProtocolSession(); + final BasicPublishBody body = evt.getMethod(); if (_log.isDebugEnabled()) @@ -70,7 +72,8 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basi body.exchange = ExchangeDefaults.DIRECT_EXCHANGE_NAME; } - Exchange e = exchangeRegistry.getExchange(body.exchange); + VirtualHost vHost = session.getVirtualHost(); + Exchange e = vHost.getExchangeRegistry().getExchange(body.exchange); // if the exchange does not exist we raise a channel exception if (e == null) { @@ -82,8 +85,8 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basi // The partially populated BasicDeliver frame plus the received route body // is stored in the channel. Once the final body frame has been received // it is routed to the exchange. - AMQChannel channel = protocolSession.getChannel(evt.getChannelId()); - channel.setPublishFrame(body, protocolSession); + AMQChannel channel = session.getChannel(evt.getChannelId()); + channel.setPublishFrame(body, session); } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java index 2bab4cac5c..325e5226ad 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java @@ -28,6 +28,7 @@ import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.AMQException; @@ -40,9 +41,9 @@ public class BasicQosHandler implements StateAwareMethodListener<BasicQosBody> return _instance; } - public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, - AMQProtocolSession session, AMQMethodEvent<BasicQosBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicQosBody> evt) throws AMQException { + AMQProtocolSession session = stateManager.getProtocolSession(); session.getChannel(evt.getChannelId()).setPrefetchCount(evt.getMethod().prefetchCount); session.getChannel(evt.getChannelId()).setPrefetchSize(evt.getMethod().prefetchSize); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java index f3e0cc3a63..a247ee33ce 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java @@ -27,6 +27,7 @@ import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; import org.apache.qpid.framing.BasicRecoverBody; import org.apache.qpid.AMQException; import org.apache.log4j.Logger; @@ -42,18 +43,18 @@ public class BasicRecoverMethodHandler implements StateAwareMethodListener<Basic return _instance; } - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, - AMQMethodEvent<BasicRecoverBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicRecoverBody> evt) throws AMQException { - _logger.debug("Recover received on protocol session " + protocolSession + " and channel " + evt.getChannelId()); - AMQChannel channel = protocolSession.getChannel(evt.getChannelId()); + AMQProtocolSession session = stateManager.getProtocolSession(); + + _logger.debug("Recover received on protocol session " + session + " and channel " + evt.getChannelId()); + AMQChannel channel = session.getChannel(evt.getChannelId()); if (channel == null) { throw new AMQException("Unknown channel " + evt.getChannelId()); } BasicRecoverBody body = evt.getMethod(); - channel.resend(protocolSession, body.requeue); + channel.resend(session, body.requeue); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java index a24ecd9b01..d46a4f6424 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java @@ -31,6 +31,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; public class ChannelCloseHandler implements StateAwareMethodListener<ChannelCloseBody> { @@ -47,18 +48,17 @@ public class ChannelCloseHandler implements StateAwareMethodListener<ChannelClos { } - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, - AMQMethodEvent<ChannelCloseBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ChannelCloseBody> evt) throws AMQException { + AMQProtocolSession session = stateManager.getProtocolSession(); ChannelCloseBody body = evt.getMethod(); _logger.info("Received channel close for id " + evt.getChannelId() + " citing class " + body.classId + " and method " + body.methodId); - protocolSession.closeChannel(evt.getChannelId()); + session.closeChannel(evt.getChannelId()); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. AMQFrame response = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0); - protocolSession.writeFrame(response); + session.writeFrame(response); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java index 81a5371829..be11d5e939 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java @@ -28,6 +28,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; import org.apache.log4j.Logger; public class ChannelCloseOkHandler implements StateAwareMethodListener<ChannelCloseOkBody> @@ -45,9 +46,7 @@ public class ChannelCloseOkHandler implements StateAwareMethodListener<ChannelCl { } - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, - AMQMethodEvent<ChannelCloseOkBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ChannelCloseOkBody> evt) throws AMQException { _logger.info("Received channel-close-ok for channel-id " + evt.getChannelId()); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java index d92a4eed6a..62f7ed4b78 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java @@ -48,13 +48,12 @@ public class ChannelFlowHandler implements StateAwareMethodListener<ChannelFlowB { } - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, - AMQMethodEvent<ChannelFlowBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ChannelFlowBody> evt) throws AMQException { + AMQProtocolSession session = stateManager.getProtocolSession(); ChannelFlowBody body = evt.getMethod(); - AMQChannel channel = protocolSession.getChannel(evt.getChannelId()); + AMQChannel channel = session.getChannel(evt.getChannelId()); channel.setSuspended(!body.active); _logger.debug("Channel.Flow for channel " + evt.getChannelId() + ", active=" + body.active); @@ -64,6 +63,6 @@ public class ChannelFlowHandler implements StateAwareMethodListener<ChannelFlowB AMQFrame response = ChannelFlowOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0, // AMQP version (major, minor) body.active); // active - protocolSession.writeFrame(response); + session.writeFrame(response); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java index 6ea9dfa595..5cd3f8ac89 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java @@ -25,6 +25,7 @@ import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.ChannelOpenBody; import org.apache.qpid.framing.ChannelOpenOkBody; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.exchange.ExchangeRegistry; @@ -47,18 +48,18 @@ public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenB { } - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, - AMQMethodEvent<ChannelOpenBody> evt) throws AMQException - { - IApplicationRegistry registry = ApplicationRegistry.getInstance(); - final AMQChannel channel = new AMQChannel(evt.getChannelId(), registry.getMessageStore(), - exchangeRegistry); - protocolSession.addChannel(channel); + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ChannelOpenBody> evt) throws AMQException + { + AMQProtocolSession session = stateManager.getProtocolSession(); + VirtualHost virtualHost = session.getVirtualHost(); + + final AMQChannel channel = new AMQChannel(evt.getChannelId(), virtualHost.getMessageStore(), + virtualHost.getExchangeRegistry()); + session.addChannel(channel); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. AMQFrame response = ChannelOpenOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0); - protocolSession.writeFrame(response); + session.writeFrame(response); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java index 0fe25a1c89..8bc849d5cb 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java @@ -47,16 +47,15 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener<C { } - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, - AMQMethodEvent<ConnectionCloseBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ConnectionCloseBody> evt) throws AMQException { + AMQProtocolSession session = stateManager.getProtocolSession(); final ConnectionCloseBody body = evt.getMethod(); _logger.info("ConnectionClose received with reply code/reply text " + body.replyCode + "/" + - body.replyText + " for " + protocolSession); + body.replyText + " for " + session); try { - protocolSession.closeSession(); + session.closeSession(); } catch (Exception e) { @@ -66,6 +65,6 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener<C // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. final AMQFrame response = ConnectionCloseOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0); - protocolSession.writeFrame(response); + session.writeFrame(response); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java index bcdd86d2ef..c10a731cc5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java @@ -46,17 +46,16 @@ public class ConnectionCloseOkMethodHandler implements StateAwareMethodListener< { } - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, - AMQMethodEvent<ConnectionCloseOkBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ConnectionCloseOkBody> evt) throws AMQException { + AMQProtocolSession session = stateManager.getProtocolSession(); //todo should this not do more than just log the method? _logger.info("Received Connection-close-ok"); try { stateManager.changeState(AMQState.CONNECTION_CLOSED); - protocolSession.closeSession(); + session.closeSession(); } catch (Exception e) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java index 8056ff9adb..88717c446b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java @@ -27,11 +27,13 @@ import org.apache.qpid.framing.ConnectionOpenOkBody; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQState; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.virtualhost.VirtualHost; public class ConnectionOpenMethodHandler implements StateAwareMethodListener<ConnectionOpenBody> { @@ -51,28 +53,39 @@ public class ConnectionOpenMethodHandler implements StateAwareMethodListener<Con return new AMQShortString(Long.toString(System.currentTimeMillis())); } - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, - AMQMethodEvent<ConnectionOpenBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ConnectionOpenBody> evt) throws AMQException { + AMQProtocolSession session = stateManager.getProtocolSession(); ConnectionOpenBody body = evt.getMethod(); - + String virtualHostName = String.valueOf(body.virtualHost); - //todo //FIXME The virtual host must be validated by the server for the connection to open-ok - // See Spec (0.8.2). Section 3.1.2 Virtual Hosts - if (protocolSession.getContextKey() == null) + VirtualHost virtualHost = stateManager.getVirtualHostRegistry().getVirtualHost(virtualHostName); + + if(virtualHost == null) { - protocolSession.setContextKey(generateClientID()); + throw body.getConnectionException(AMQConstant.NOT_FOUND.getCode(), "Unknown virtual host: " + virtualHostName); } + else + { + session.setVirtualHost( virtualHost ); + - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - AMQFrame response = ConnectionOpenOkBody.createAMQFrame((short)0, - (byte)8, (byte)0, // AMQP version (major, minor) - body.virtualHost); // knownHosts - stateManager.changeState(AMQState.CONNECTION_OPEN); - protocolSession.writeFrame(response); + + // See Spec (0.8.2). Section 3.1.2 Virtual Hosts + if (session.getContextKey() == null) + { + session.setContextKey(generateClientID()); + } + + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame response = ConnectionOpenOkBody.createAMQFrame((short)0, + (byte)8, (byte)0, // AMQP version (major, minor) + body.virtualHost); + stateManager.changeState(AMQState.CONNECTION_OPEN); + session.writeFrame(response); + } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java index d33874b727..11cbaade30 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java @@ -54,14 +54,13 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener { } - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, - AMQMethodEvent<ConnectionSecureOkBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ConnectionSecureOkBody> evt) throws AMQException { + AMQProtocolSession session = stateManager.getProtocolSession(); ConnectionSecureOkBody body = evt.getMethod(); AuthenticationManager authMgr = ApplicationRegistry.getInstance().getAuthenticationManager(); - SaslServer ss = protocolSession.getSaslServer(); + SaslServer ss = session.getSaslServer(); if (ss == null) { throw new AMQException("No SASL context set up in session"); @@ -84,8 +83,8 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener ConnectionCloseBody.getMethod((byte)8, (byte)0), // methodId AMQConstant.NOT_ALLOWED.getCode(), // replyCode AMQConstant.NOT_ALLOWED.getName()); // replyText - protocolSession.writeFrame(close); - disposeSaslServer(protocolSession); + session.writeFrame(close); + disposeSaslServer(session); break; case SUCCESS: _logger.info("Connected as: " + ss.getAuthorizationID()); @@ -101,8 +100,8 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener Integer.MAX_VALUE, // channelMax ConnectionStartOkMethodHandler.getConfiguredFrameSize(), // frameMax HeartbeatConfig.getInstance().getDelay()); // heartbeat - protocolSession.writeFrame(tune); - disposeSaslServer(protocolSession); + session.writeFrame(tune); + disposeSaslServer(session); break; case CONTINUE: stateManager.changeState(AMQState.CONNECTION_NOT_AUTH); @@ -112,7 +111,7 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener AMQFrame challenge = ConnectionSecureBody.createAMQFrame(0, (byte)8, (byte)0, // AMQP version (major, minor) authResult.challenge); // challenge - protocolSession.writeFrame(challenge); + session.writeFrame(challenge); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java index 6cb384f081..b45a017166 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java @@ -60,10 +60,9 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener< { } - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, - AMQMethodEvent<ConnectionStartOkBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ConnectionStartOkBody> evt) throws AMQException { + AMQProtocolSession session = stateManager.getProtocolSession(); final ConnectionStartOkBody body = evt.getMethod(); _logger.info("SASL Mechanism selected: " + body.mechanism); _logger.info("Locale selected: " + body.locale); @@ -73,15 +72,15 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener< SaslServer ss = null; try { - ss = authMgr.createSaslServer(String.valueOf(body.mechanism), protocolSession.getLocalFQDN()); - protocolSession.setSaslServer(ss); + ss = authMgr.createSaslServer(String.valueOf(body.mechanism), session.getLocalFQDN()); + session.setSaslServer(ss); AuthenticationResult authResult = authMgr.authenticate(ss, body.response); //save clientProperties - if (protocolSession.getClientProperties() == null) + if (session.getClientProperties() == null) { - protocolSession.setClientProperties(body.clientProperties); + session.setClientProperties(body.clientProperties); } switch (authResult.status) @@ -100,7 +99,7 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener< Integer.MAX_VALUE, // channelMax getConfiguredFrameSize(), // frameMax HeartbeatConfig.getInstance().getDelay()); // heartbeat - protocolSession.writeFrame(tune); + session.writeFrame(tune); break; case CONTINUE: stateManager.changeState(AMQState.CONNECTION_NOT_AUTH); @@ -110,12 +109,12 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener< AMQFrame challenge = ConnectionSecureBody.createAMQFrame(0, (byte)8, (byte)0, // AMQP version (major, minor) authResult.challenge); // challenge - protocolSession.writeFrame(challenge); + session.writeFrame(challenge); } } catch (SaslException e) { - disposeSaslServer(protocolSession); + disposeSaslServer(session); throw new AMQException("SASL error: " + e, e); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java index 960643325a..020e93b7d2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java @@ -42,16 +42,15 @@ public class ConnectionTuneOkMethodHandler implements StateAwareMethodListener<C return _instance; } - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, - AMQMethodEvent<ConnectionTuneOkBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ConnectionTuneOkBody> evt) throws AMQException { + AMQProtocolSession session = stateManager.getProtocolSession(); ConnectionTuneOkBody body = evt.getMethod(); if (_logger.isDebugEnabled()) { _logger.debug(body); } stateManager.changeState(AMQState.CONNECTION_NOT_OPENED); - protocolSession.initHeartbeats(body.heartbeat); + session.initHeartbeats(body.heartbeat); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java index 596e6bf332..30da1398b3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java @@ -30,6 +30,7 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.virtualhost.VirtualHost; /** * @author Apache Software Foundation @@ -61,10 +62,12 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo { } - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, - AMQMethodEvent<ExchangeBoundBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ExchangeBoundBody> evt) throws AMQException { + AMQProtocolSession session = stateManager.getProtocolSession(); + VirtualHost virtualHost = session.getVirtualHost(); + QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. byte major = (byte)8; @@ -79,7 +82,7 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo { throw new AMQException("Exchange exchange must not be null"); } - Exchange exchange = exchangeRegistry.getExchange(exchangeName); + Exchange exchange = virtualHost.getExchangeRegistry().getExchange(exchangeName); AMQFrame response; if (exchange == null) { @@ -112,6 +115,7 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo } else { + AMQQueue queue = queueRegistry.getQueue(queueName); if (queue == null) { @@ -194,6 +198,6 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo " to exchange " + exchangeName)); // replyText } } - protocolSession.writeFrame(response); + session.writeFrame(response); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java index 84e9a4e3f4..03af56ff14 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java @@ -38,6 +38,7 @@ import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; public class ExchangeDeclareHandler implements StateAwareMethodListener<ExchangeDeclareBody> { @@ -50,17 +51,19 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange return _instance; } - private final ExchangeFactory exchangeFactory; + private ExchangeDeclareHandler() { - exchangeFactory = ApplicationRegistry.getInstance().getExchangeFactory(); } - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, - AMQMethodEvent<ExchangeDeclareBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ExchangeDeclareBody> evt) throws AMQException { + AMQProtocolSession session = stateManager.getProtocolSession(); + VirtualHost virtualHost = session.getVirtualHost(); + ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); + ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory(); + final ExchangeDeclareBody body = evt.getMethod(); if (_logger.isDebugEnabled()) { @@ -106,7 +109,7 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. AMQFrame response = ExchangeDeclareOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0); - protocolSession.writeFrame(response); + session.writeFrame(response); } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java index 2faba57e04..fc2a4b1fd5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java @@ -31,6 +31,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.virtualhost.VirtualHost; public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeDeleteBody> { @@ -45,10 +46,12 @@ public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeD { } - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, - AMQMethodEvent<ExchangeDeleteBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ExchangeDeleteBody> evt) throws AMQException { + AMQProtocolSession session = stateManager.getProtocolSession(); + VirtualHost virtualHost = session.getVirtualHost(); + ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); + ExchangeDeleteBody body = evt.getMethod(); try { @@ -57,7 +60,7 @@ public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeD // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. AMQFrame response = ExchangeDeleteOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0); - protocolSession.writeFrame(response); + session.writeFrame(response); } catch (ExchangeInUseException e) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java index 45180d0cb6..0a23b9bd86 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java @@ -34,6 +34,7 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.virtualhost.VirtualHost; public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> { @@ -50,15 +51,19 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> { } - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, - AMQMethodEvent<QueueBindBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<QueueBindBody> evt) throws AMQException { + AMQProtocolSession session = stateManager.getProtocolSession(); + VirtualHost virtualHost = session.getVirtualHost(); + ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); + QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); + + final QueueBindBody body = evt.getMethod(); final AMQQueue queue; if (body.queue == null) { - queue = protocolSession.getChannel(evt.getChannelId()).getDefaultQueue(); + queue = session.getChannel(evt.getChannelId()).getDefaultQueue(); if (queue == null) { throw new AMQException("No default queue defined on channel and queue was null"); @@ -94,7 +99,7 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. final AMQFrame response = QueueBindOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0); - protocolSession.writeFrame(response); + session.writeFrame(response); } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java index b62fe22b89..fdf98bb49e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java @@ -22,7 +22,6 @@ package org.apache.qpid.server.handler; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.AMQChannelException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.configuration.Configured; @@ -37,7 +36,7 @@ import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.qpid.server.configuration.Configurator; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; import java.text.MessageFormat; import java.util.concurrent.atomic.AtomicInteger; @@ -58,18 +57,21 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar private final AtomicInteger _counter = new AtomicInteger(); - private final MessageStore _store; + protected QueueDeclareHandler() { Configurator.configure(this); - _store = ApplicationRegistry.getInstance().getMessageStore(); } - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, - AMQMethodEvent<QueueDeclareBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException { + AMQProtocolSession session = stateManager.getProtocolSession(); + VirtualHost virtualHost = session.getVirtualHost(); + ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); + QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); + MessageStore store = virtualHost.getMessageStore(); + QueueDeclareBody body = evt.getMethod(); // if we aren't given a queue name, we create one which we return to the client @@ -94,10 +96,10 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar } else { - queue = createQueue(body, queueRegistry, protocolSession); + queue = createQueue(body, virtualHost, session); if (queue.isDurable() && !queue.isAutoDelete()) { - _store.createQueue(queue); + store.createQueue(queue); } queueRegistry.registerQueue(queue); if (autoRegister) @@ -109,14 +111,14 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar } } } - else if(queue.getOwner() != null && !protocolSession.getContextKey().equals(queue.getOwner())) + else if(queue.getOwner() != null && !session.getContextKey().equals(queue.getOwner())) { // todo - constant throw body.getChannelException(405, "Cannot declare queue, as exclusive queue with same name declared on another connection"); } //set this as the default queue on the channel: - protocolSession.getChannel(evt.getChannelId()).setDefaultQueue(queue); + session.getChannel(evt.getChannelId()).setDefaultQueue(queue); } if (!body.nowait) @@ -130,7 +132,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar queue.getMessageCount(), // messageCount body.queue); // queue _log.info("Queue " + body.queue + " declared successfully"); - protocolSession.writeFrame(response); + session.writeFrame(response); } } @@ -144,10 +146,43 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar return MessageFormat.format("{0,number,0000000000000}", value); } - protected AMQQueue createQueue(QueueDeclareBody body, QueueRegistry registry, AMQProtocolSession session) + protected AMQQueue createQueue(QueueDeclareBody body, VirtualHost virtualHost, final AMQProtocolSession session) throws AMQException { + final QueueRegistry registry = virtualHost.getQueueRegistry(); AMQShortString owner = body.exclusive ? session.getContextKey() : null; - return new AMQQueue(body.queue, body.durable, owner, body.autoDelete || (!body.durable && body.exclusive), registry); + final AMQQueue queue = new AMQQueue(body.queue, body.durable, owner, body.autoDelete, virtualHost); + final AMQShortString queueName = queue.getName(); + + if(body.exclusive && !body.durable) + { + final AMQProtocolSession.Task deleteQueueTask = + new AMQProtocolSession.Task() + { + + public void doTask(AMQProtocolSession session) throws AMQException + { + if(registry.getQueue(queueName) == queue) + { + queue.delete(); + } + + } + }; + + session.addSessionCloseTask(deleteQueueTask); + + queue.addQueueDeleteTask(new AMQQueue.Task() + { + public void doTask(AMQQueue queue) + { + session.removeSessionCloseTask(deleteQueueTask); + } + }); + + + } + + return queue; } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java index 245d86a7a6..3f0833de41 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java @@ -24,18 +24,14 @@ import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.framing.QueueDeleteBody; import org.apache.qpid.framing.QueueDeleteOkBody; -import org.apache.qpid.framing.ChannelCloseBody; -import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.AMQException; -import org.apache.qpid.AMQChannelException; -import org.apache.qpid.protocol.AMQConstant; public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteBody> { @@ -47,7 +43,6 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDelete } private final boolean _failIfNotFound; - private final MessageStore _store; public QueueDeleteHandler() { @@ -57,12 +52,16 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDelete public QueueDeleteHandler(boolean failIfNotFound) { _failIfNotFound = failIfNotFound; - _store = ApplicationRegistry.getInstance().getMessageStore(); } - public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException { + AMQProtocolSession session = stateManager.getProtocolSession(); + VirtualHost virtualHost = session.getVirtualHost(); + QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); + MessageStore store = virtualHost.getMessageStore(); + QueueDeleteBody body = evt.getMethod(); AMQQueue queue; if(body.queue == null) @@ -71,7 +70,7 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDelete } else { - queue = queues.getQueue(body.queue); + queue = queueRegistry.getQueue(body.queue); } if(queue == null) @@ -96,7 +95,7 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDelete else { int purged = queue.delete(body.ifUnused, body.ifEmpty); - _store.removeQueue(queue.getName()); + store.removeQueue(queue.getName()); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java index ad63d36351..c8341ee5b4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java @@ -9,6 +9,7 @@ import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
@@ -34,8 +35,12 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod _failIfNotFound = failIfNotFound;
}
- public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueuePurgeBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<QueuePurgeBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ VirtualHost virtualHost = session.getVirtualHost();
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+
QueuePurgeBody body = evt.getMethod();
AMQQueue queue;
if(body.queue == null)
@@ -52,7 +57,7 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod }
else
{
- queue = queues.getQueue(body.queue);
+ queue = queueRegistry.getQueue(body.queue);
}
if(queue == null)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java index 68b0c584eb..6f6b8b6f3c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java @@ -24,6 +24,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.TxCommitBody; import org.apache.qpid.framing.TxCommitOkBody; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; @@ -47,10 +48,9 @@ public class TxCommitHandler implements StateAwareMethodListener<TxCommitBody> { } - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, - AMQMethodEvent<TxCommitBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<TxCommitBody> evt) throws AMQException { + AMQProtocolSession session = stateManager.getProtocolSession(); try { @@ -58,13 +58,13 @@ public class TxCommitHandler implements StateAwareMethodListener<TxCommitBody> { _log.debug("Commit received on channel " + evt.getChannelId()); } - AMQChannel channel = protocolSession.getChannel(evt.getChannelId()); + AMQChannel channel = session.getChannel(evt.getChannelId()); channel.commit(); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - protocolSession.writeFrame(TxCommitOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0)); - channel.processReturns(protocolSession); + session.writeFrame(TxCommitOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0)); + channel.processReturns(session); } catch(AMQException e) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java index d71c93a6c6..31a28d2275 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java @@ -30,6 +30,7 @@ import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.virtualhost.VirtualHost; public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBody> { @@ -44,20 +45,20 @@ public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBod { } - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, - AMQMethodEvent<TxRollbackBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<TxRollbackBody> evt) throws AMQException { + AMQProtocolSession session = stateManager.getProtocolSession(); + try{ - AMQChannel channel = protocolSession.getChannel(evt.getChannelId()); + AMQChannel channel = session.getChannel(evt.getChannelId()); channel.rollback(); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - protocolSession.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0)); + session.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0)); //Now resend all the unacknowledged messages back to the original subscribers. //(Must be done after the TxnRollback-ok response). - channel.resend(protocolSession, false); + channel.resend(session, false); }catch(AMQException e){ throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to rollback: " + e.getMessage()); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java index 7d66fa9d5c..30b70869f9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java @@ -29,6 +29,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.virtualhost.VirtualHost; public class TxSelectHandler implements StateAwareMethodListener<TxSelectBody> { @@ -43,14 +44,14 @@ public class TxSelectHandler implements StateAwareMethodListener<TxSelectBody> { } - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, - AMQMethodEvent<TxSelectBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<TxSelectBody> evt) throws AMQException { - protocolSession.getChannel(evt.getChannelId()).setLocalTransactional(); + AMQProtocolSession session = stateManager.getProtocolSession(); + + session.getChannel(evt.getChannelId()).setLocalTransactional(); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - protocolSession.writeFrame(TxSelectOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0)); + session.writeFrame(TxSelectOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0)); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java b/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java index 311eb8add9..46bac52e78 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java +++ b/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java @@ -67,7 +67,7 @@ public abstract class DefaultManagedObject extends StandardMBean implements Mana { try { - ApplicationRegistry.getInstance().getManagedObjectRegistry().registerObject(this); + getManagedObjectRegistry().registerObject(this); } catch (JMException e) { @@ -75,11 +75,16 @@ public abstract class DefaultManagedObject extends StandardMBean implements Mana } } + protected ManagedObjectRegistry getManagedObjectRegistry() + { + return ApplicationRegistry.getInstance().getManagedObjectRegistry(); + } + public void unregister() throws AMQException { try { - ApplicationRegistry.getInstance().getManagedObjectRegistry().unregisterObject(this); + getManagedObjectRegistry().unregisterObject(this); } catch (JMException e) { @@ -91,6 +96,7 @@ public abstract class DefaultManagedObject extends StandardMBean implements Mana { return getObjectInstanceName() + "[" + getType() + "]"; } + /** * Created the ObjectName as per the JMX Specs diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index ed74263596..9ca11abb56 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -35,10 +35,10 @@ import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.codec.AMQDecoder; import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.management.ManagedObject; -import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.state.AMQStateManager; @@ -51,6 +51,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.CopyOnWriteArrayList; public class AMQMinaProtocolSession implements AMQProtocolSession, ProtocolVersionList, @@ -65,16 +66,14 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, private AMQShortString _contextKey; + private VirtualHost _virtualHost; + private final Map<Integer, AMQChannel> _channelMap = new HashMap<Integer, AMQChannel>(); private final CopyOnWriteArraySet<AMQMethodListener> _frameListeners = new CopyOnWriteArraySet<AMQMethodListener>(); private final AMQStateManager _stateManager; - private final QueueRegistry _queueRegistry; - - private final ExchangeRegistry _exchangeRegistry; - private AMQCodecFactory _codecFactory; private AMQProtocolSessionMBean _managedObject; @@ -93,6 +92,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, private byte _major; private byte _minor; private FieldTable _clientProperties; + private final List<Task> _taskList = new CopyOnWriteArrayList<Task>(); + public ManagedObject getManagedObject() { @@ -100,23 +101,23 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, } - public AMQMinaProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, + public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory) throws AMQException { - _stateManager = new AMQStateManager(queueRegistry, exchangeRegistry, this); + _stateManager = new AMQStateManager(virtualHostRegistry, this); _minaProtocolSession = session; session.setAttachment(this); - _queueRegistry = queueRegistry; - _exchangeRegistry = exchangeRegistry; + + _codecFactory = codecFactory; _managedObject = createMBean(); _managedObject.register(); // this(session, queueRegistry, exchangeRegistry, codecFactory, new AMQStateManager()); } - public AMQMinaProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, + public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory, AMQStateManager stateManager) throws AMQException { @@ -124,8 +125,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, _minaProtocolSession = session; session.setAttachment(this); - _queueRegistry = queueRegistry; - _exchangeRegistry = exchangeRegistry; + _codecFactory = codecFactory; _managedObject = createMBean(); _managedObject.register(); @@ -461,6 +461,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _managedObject.unregister(); } + for(Task task : _taskList) + { + task.doTask(this); + } } } @@ -556,4 +560,27 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { return _minaProtocolSession.getRemoteAddress(); } + + + public VirtualHost getVirtualHost() + { + return _virtualHost; + } + + public void setVirtualHost(VirtualHost virtualHost) + { + _virtualHost = virtualHost; + } + + public void addSessionCloseTask(Task task) + { + _taskList.add(task); + } + + public void removeSessionCloseTask(Task task) + { + _taskList.remove(task); + } + + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java index 10e23caac3..474714680b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java @@ -53,41 +53,26 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter implements Protoco { private static final Logger _logger = Logger.getLogger(AMQPFastProtocolHandler.class); - /** - * The registry of all queues. This is passed to frame listeners when frame - * events occur. - */ - private final QueueRegistry _queueRegistry; + private final IApplicationRegistry _applicationRegistry; - /** - * The registry of all exchanges. This is passed to frame listeners when frame - * events occur. - */ - private final ExchangeRegistry _exchangeRegistry; private boolean _useSSL; public AMQPFastProtocolHandler(Integer applicationRegistryInstance) { - IApplicationRegistry registry = ApplicationRegistry.getInstance(applicationRegistryInstance); - - _queueRegistry = registry.getQueueRegistry(); - _exchangeRegistry = registry.getExchangeRegistry(); - _logger.debug("AMQPFastProtocolHandler created"); + this(ApplicationRegistry.getInstance(applicationRegistryInstance)); } - public AMQPFastProtocolHandler(QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry) + public AMQPFastProtocolHandler(IApplicationRegistry applicationRegistry) { - _queueRegistry = queueRegistry; - _exchangeRegistry = exchangeRegistry; + _applicationRegistry = applicationRegistry; _logger.debug("AMQPFastProtocolHandler created"); } protected AMQPFastProtocolHandler(AMQPFastProtocolHandler handler) { - this(handler._queueRegistry, handler._exchangeRegistry); + this(handler._applicationRegistry); } public void sessionCreated(IoSession protocolSession) throws Exception @@ -95,7 +80,7 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter implements Protoco SessionUtil.initialize(protocolSession); final AMQCodecFactory codecFactory = new AMQCodecFactory(true); - createSession(protocolSession, _queueRegistry, _exchangeRegistry, codecFactory); + createSession(protocolSession, _applicationRegistry, codecFactory); _logger.info("Protocol session created"); final ProtocolCodecFilter pcf = new ProtocolCodecFilter(codecFactory); @@ -120,9 +105,9 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter implements Protoco /** * Separated into its own, protected, method to allow easier reuse */ - protected void createSession(IoSession session, QueueRegistry queues, ExchangeRegistry exchanges, AMQCodecFactory codec) throws AMQException + protected void createSession(IoSession session, IApplicationRegistry applicationRegistry, AMQCodecFactory codec) throws AMQException { - new AMQMinaProtocolSession(session, queues, exchanges, codec); + new AMQMinaProtocolSession(session, applicationRegistry.getVirtualHostRegistry(), codec); } public void sessionOpened(IoSession protocolSession) throws Exception diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPProtocolProvider.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPProtocolProvider.java index ff1316f704..07c153bfe8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPProtocolProvider.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPProtocolProvider.java @@ -42,8 +42,7 @@ public class AMQPProtocolProvider public AMQPProtocolProvider() { IApplicationRegistry registry = ApplicationRegistry.getInstance(); - _handler = new AMQPFastProtocolHandler(registry.getQueueRegistry(), - registry.getExchangeRegistry()); + _handler = new AMQPFastProtocolHandler(registry); } public AMQPFastProtocolHandler getHandler() diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index a1249723ee..ee7e46eba4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -25,6 +25,7 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.protocol.AMQProtocolWriter; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.AMQException; import javax.security.sasl.SaslServer; @@ -32,6 +33,13 @@ import javax.security.sasl.SaslServer; public interface AMQProtocolSession extends AMQProtocolWriter { + + + public static interface Task + { + public void doTask(AMQProtocolSession session) throws AMQException; + } + /** * Called when a protocol data block is received * @param message the data block that has been received @@ -126,4 +134,13 @@ public interface AMQProtocolSession extends AMQProtocolWriter void setClientProperties(FieldTable clientProperties); Object getClientIdentifier(); + + VirtualHost getVirtualHost(); + + void setVirtualHost(VirtualHost virtualHost); + + void addSessionCloseTask(Task task); + + void removeSessionCloseTask(Task task); + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java index df494915a3..b5fec39626 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java @@ -26,6 +26,7 @@ import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.management.AMQManagedObject; import org.apache.qpid.server.management.MBeanConstructor; import org.apache.qpid.server.management.MBeanDescription; +import org.apache.qpid.server.management.ManagedObject; import javax.management.JMException; import javax.management.MBeanException; @@ -93,6 +94,11 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed return _session.getIOSession().getRemoteAddress().toString(); } + public ManagedObject getParentObject() + { + return _session.getVirtualHost().getManagedObject(); + } + public Long getWrittenBytes() { return _session.getIOSession().getWrittenBytes(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 18b3adc635..709dd28ad5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -30,11 +30,14 @@ import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.virtualhost.VirtualHost; import javax.management.JMException; import java.text.MessageFormat; import java.util.List; +import java.util.ArrayList; import java.util.concurrent.Executor; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicBoolean; @@ -45,6 +48,7 @@ import java.util.concurrent.atomic.AtomicBoolean; public class AMQQueue implements Managable, Comparable { + public static final class ExistingExclusiveSubscription extends AMQException { @@ -95,6 +99,12 @@ public class AMQQueue implements Managable, Comparable private final AtomicBoolean _isExclusive = new AtomicBoolean(); + private final AtomicBoolean _deleted = new AtomicBoolean(false); + + + + private List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>(); + /** * Manages message delivery. @@ -102,11 +112,6 @@ public class AMQQueue implements Managable, Comparable private final DeliveryManager _deliveryMgr; /** - * The queue registry with which this queue is registered. - */ - private final QueueRegistry _queueRegistry; - - /** * Used to track bindings to exchanges so that on deletion they can easily * be cancelled. */ @@ -119,6 +124,9 @@ public class AMQQueue implements Managable, Comparable private final AMQQueueMBean _managedObject; + private final VirtualHost _virtualHost; + + /** * max allowed size(KB) of a single message */ @@ -145,59 +153,26 @@ public class AMQQueue implements Managable, Comparable } public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, - boolean autoDelete, QueueRegistry queueRegistry) - throws AMQException - { - this(name, durable, owner, autoDelete, queueRegistry, - AsyncDeliveryConfig.getAsyncDeliveryExecutor(), new SubscriptionImpl.Factory()); - } - - public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, - boolean autoDelete, QueueRegistry queueRegistry, SubscriptionFactory subscriptionFactory) + boolean autoDelete, VirtualHost virtualHost) throws AMQException { - this(name, durable, owner, autoDelete, queueRegistry, - AsyncDeliveryConfig.getAsyncDeliveryExecutor(), subscriptionFactory); + this(name, durable, owner, autoDelete, virtualHost, + AsyncDeliveryConfig.getAsyncDeliveryExecutor(), new SubscriptionSet(), new SubscriptionImpl.Factory()); } - public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, - boolean autoDelete, QueueRegistry queueRegistry, Executor asyncDelivery, - SubscriptionFactory subscriptionFactory) - throws AMQException - { - - this(name, durable, owner, autoDelete, queueRegistry, asyncDelivery, new SubscriptionSet(), subscriptionFactory); - } - public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, - boolean autoDelete, QueueRegistry queueRegistry, Executor asyncDelivery) - throws AMQException - { - - this(name, durable, owner, autoDelete, queueRegistry, asyncDelivery, new SubscriptionSet(), - new SubscriptionImpl.Factory()); - } - - protected AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, - boolean autoDelete, QueueRegistry queueRegistry, - SubscriptionSet subscribers, SubscriptionFactory subscriptionFactory) - throws AMQException - { - this(name, durable, owner, autoDelete, queueRegistry, - AsyncDeliveryConfig.getAsyncDeliveryExecutor(), subscribers, subscriptionFactory); - } protected AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, - boolean autoDelete, QueueRegistry queueRegistry, + boolean autoDelete, VirtualHost virtualHost, SubscriptionSet subscribers) throws AMQException { - this(name, durable, owner, autoDelete, queueRegistry, + this(name, durable, owner, autoDelete, virtualHost, AsyncDeliveryConfig.getAsyncDeliveryExecutor(), subscribers, new SubscriptionImpl.Factory()); } protected AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, - boolean autoDelete, QueueRegistry queueRegistry, + boolean autoDelete, VirtualHost virtualHost, Executor asyncDelivery, SubscriptionSet subscribers, SubscriptionFactory subscriptionFactory) throws AMQException { @@ -205,18 +180,20 @@ public class AMQQueue implements Managable, Comparable { throw new IllegalArgumentException("Queue name must not be null"); } - if (queueRegistry == null) + if (virtualHost == null) { - throw new IllegalArgumentException("Queue registry must not be null"); + throw new IllegalArgumentException("Virtual Host must not be null"); } _name = name; _durable = durable; _owner = owner; _autoDelete = autoDelete; - _queueRegistry = queueRegistry; + _virtualHost = virtualHost; _asyncDelivery = asyncDelivery; + _managedObject = createMBean(); _managedObject.register(); + _subscribers = subscribers; _subscriptionFactory = subscriptionFactory; _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this); @@ -492,10 +469,18 @@ public class AMQQueue implements Managable, Comparable public void delete() throws AMQException { - _subscribers.queueDeleted(this); - _bindings.deregister(); - _queueRegistry.unregisterQueue(_name); - _managedObject.unregister(); + if(!_deleted.getAndSet(true)) + { + _subscribers.queueDeleted(this); + _bindings.deregister(); + _virtualHost.getQueueRegistry().unregisterQueue(_name); + _managedObject.unregister(); + for(Task task : _deleteTaskList) + { + task.doTask(this); + } + _deleteTaskList.clear(); + } } protected void autodelete() throws AMQException @@ -620,6 +605,24 @@ public class AMQQueue implements Managable, Comparable return _deliveryMgr.performGet(session, channel, acks); } - + public QueueRegistry getQueueRegistry() + { + return _virtualHost.getQueueRegistry(); + } + + public VirtualHost getVirtualHost() + { + return _virtualHost; + } + + public static interface Task + { + public void doTask(AMQQueue queue) throws AMQException; + } + + public void addQueueDeleteTask(Task task) + { + _deleteTaskList.add(task); + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index 012b3600ca..ab67012b19 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -20,7 +20,9 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.management.MBeanDescription; import org.apache.qpid.server.management.AMQManagedObject; import org.apache.qpid.server.management.MBeanConstructor; +import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.Main; import org.apache.qpid.AMQException; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.BasicContentHeaderProperties; @@ -28,11 +30,7 @@ import org.apache.qpid.framing.ContentHeaderBody; import org.apache.mina.common.ByteBuffer; import javax.management.openmbean.*; -import javax.management.JMException; -import javax.management.Notification; -import javax.management.MBeanException; -import javax.management.MBeanNotificationInfo; -import javax.management.OperationsException; +import javax.management.*; import javax.management.monitor.MonitorNotification; import java.util.List; import java.util.ArrayList; @@ -73,6 +71,12 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue _queueName = jmxEncode(new StringBuffer(queue.getName()), 0).toString(); } + + public ManagedObject getParentObject() + { + return _queue.getVirtualHost().getManagedObject(); + } + static { try @@ -373,6 +377,14 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue return _messageList; } +// +// public ObjectName getObjectName() throws MalformedObjectNameException +// { +// String objNameString = super.getObjectName().toString(); +// +// return new ObjectName(objNameString); +// } + /** * returns Notifications sent by this MBean. diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java index 8ab26def74..084612ca41 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; +import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.framing.AMQShortString; import java.util.concurrent.ConcurrentMap; @@ -30,8 +31,16 @@ public class DefaultQueueRegistry implements QueueRegistry { private ConcurrentMap<AMQShortString, AMQQueue> _queueMap = new ConcurrentHashMap<AMQShortString, AMQQueue>(); - public DefaultQueueRegistry() + private final VirtualHost _virtualHost; + + public DefaultQueueRegistry(VirtualHost virtualHost) + { + _virtualHost = virtualHost; + } + + public VirtualHost getVirtualHost() { + return _virtualHost; } public void registerQueue(AMQQueue queue) throws AMQException diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java index bfbaf27c84..c5f235f1b3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java @@ -21,11 +21,14 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; +import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.framing.AMQShortString; public interface QueueRegistry { + VirtualHost getVirtualHost(); + void registerQueue(AMQQueue queue) throws AMQException; void unregisterQueue(AMQShortString name) throws AMQException; diff --git a/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java index 48331843e5..0630d4f39f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.registry; import org.apache.commons.configuration.Configuration; import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.Configurator; +import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.HashMap; import java.util.Iterator; @@ -38,7 +39,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry { private static final Logger _logger = Logger.getLogger(ApplicationRegistry.class); - private static Map _instanceMap = new HashMap(); + private static Map<Integer, IApplicationRegistry> _instanceMap = new HashMap<Integer, IApplicationRegistry>(); private final Map<Class<?>, Object> _configuredObjects = new HashMap<Class<?>, Object>(); @@ -62,20 +63,13 @@ public abstract class ApplicationRegistry implements IApplicationRegistry { synchronized (ApplicationRegistry.class) { - Iterator keyIterator = _instanceMap.keySet().iterator(); + Iterator<IApplicationRegistry> keyIterator = _instanceMap.values().iterator(); while (keyIterator.hasNext()) { - int key = (Integer) keyIterator.next(); - IApplicationRegistry instance = (IApplicationRegistry) _instanceMap.get(key); - - if ((instance != null)) - { - if (instance.getMessageStore() != null) - { - instance.getMessageStore().close(); - } - } + IApplicationRegistry instance = keyIterator.next(); + + instance.close(); } } } @@ -118,7 +112,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry { try { - ((IApplicationRegistry) _instanceMap.get(instanceID)).getMessageStore().close(); + _instanceMap.get(instanceID).close(); } catch (Exception e) { @@ -143,7 +137,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry public static IApplicationRegistry getInstance(int instanceID) { - IApplicationRegistry instance = (IApplicationRegistry) _instanceMap.get(instanceID); + IApplicationRegistry instance = _instanceMap.get(instanceID); if (instance == null) { @@ -168,6 +162,14 @@ public abstract class ApplicationRegistry implements IApplicationRegistry } } + public void close() throws Exception + { + for(VirtualHost virtualHost : getVirtualHostRegistry().getVirtualHosts()) + { + virtualHost.close(); + } + } + public Configuration getConfiguration() { return _configuration; @@ -193,6 +195,8 @@ public abstract class ApplicationRegistry implements IApplicationRegistry return instance; } + + public static void setDefaultApplicationRegistry(String clazz) { _APPLICATION_REGISTRY = clazz; diff --git a/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java index 1eb490d6fb..790421b497 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java @@ -38,22 +38,26 @@ import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.security.auth.AuthenticationManager; import org.apache.qpid.server.security.auth.SASLAuthenticationManager; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.mina.common.ByteBuffer; import java.io.File; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; public class ConfigurationFileApplicationRegistry extends ApplicationRegistry { - private QueueRegistry _queueRegistry; - - private ExchangeRegistry _exchangeRegistry; - - private ExchangeFactory _exchangeFactory; private ManagedObjectRegistry _managedObjectRegistry; private AuthenticationManager _authenticationManager; - private MessageStore _messageStore; + private VirtualHostRegistry _virtualHostRegistry; + + + private final Map<String, VirtualHost> _virtualHosts = new ConcurrentHashMap<String, VirtualHost>(); + public ConfigurationFileApplicationRegistry(File configurationURL) throws ConfigurationException { @@ -91,11 +95,19 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry public void initialise() throws Exception { initialiseManagedObjectRegistry(); - _queueRegistry = new DefaultQueueRegistry(); - _exchangeFactory = new DefaultExchangeFactory(); - _exchangeRegistry = new DefaultExchangeRegistry(_exchangeFactory); + _virtualHostRegistry = new VirtualHostRegistry(); _authenticationManager = new SASLAuthenticationManager(); - initialiseMessageStore(); + + initialiseVirtualHosts(); + } + + private void initialiseVirtualHosts() throws Exception + { + for(String name : getVirtualHostNames()) + { + + _virtualHostRegistry.registerVirtualHost(new VirtualHost(name,getConfiguration().subset("virtualhosts.virtualhost."+name))); + } } private void initialiseManagedObjectRegistry() @@ -111,34 +123,10 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry } } - private void initialiseMessageStore() throws Exception - { - String messageStoreClass = _configuration.getString("store.class"); - Class clazz = Class.forName(messageStoreClass); - Object o = clazz.newInstance(); - if (!(o instanceof MessageStore)) - { - throw new Exception("Message store class must implement " + MessageStore.class + ". Class " + clazz + - " does not."); - } - _messageStore = (MessageStore) o; - _messageStore.configure(getQueueRegistry(), "store", _configuration); - } - - public QueueRegistry getQueueRegistry() + public VirtualHostRegistry getVirtualHostRegistry() { - return _queueRegistry; - } - - public ExchangeRegistry getExchangeRegistry() - { - return _exchangeRegistry; - } - - public ExchangeFactory getExchangeFactory() - { - return _exchangeFactory; + return _virtualHostRegistry; } public ManagedObjectRegistry getManagedObjectRegistry() @@ -151,8 +139,8 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry return _authenticationManager; } - public MessageStore getMessageStore() + public Collection<String> getVirtualHostNames() { - return _messageStore; - } + return getConfiguration().getList("virtualhosts.virtualhost.name"); + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java index cd664f9a4b..703aed69d2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java @@ -26,8 +26,12 @@ import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.management.ManagedObjectRegistry; import org.apache.qpid.server.security.auth.AuthenticationManager; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; import org.apache.commons.configuration.Configuration; +import java.util.Collection; + public interface IApplicationRegistry { /** @@ -35,7 +39,9 @@ public interface IApplicationRegistry * that need access to the application registry itself for initialisation are able to use it. Attempting to * initialise in the constructor will lead to failures since the registry reference will not have been set. */ - void initialise() throws Exception; + void initialise() throws Exception; + + void close() throws Exception; /** * This gets access to a "configured object". A configured object has fields populated from a the configuration @@ -54,15 +60,11 @@ public interface IApplicationRegistry */ Configuration getConfiguration(); - QueueRegistry getQueueRegistry(); - - ExchangeRegistry getExchangeRegistry(); - - ExchangeFactory getExchangeFactory(); - ManagedObjectRegistry getManagedObjectRegistry(); AuthenticationManager getAuthenticationManager(); - MessageStore getMessageStore(); + Collection<String> getVirtualHostNames(); + + VirtualHostRegistry getVirtualHostRegistry(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java index 81ce704026..7b8ba1d9cc 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java @@ -31,6 +31,7 @@ import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; import org.apache.log4j.Logger; import java.util.HashMap; @@ -46,8 +47,8 @@ import java.util.concurrent.CopyOnWriteArraySet; public class AMQStateManager implements AMQMethodListener { private static final Logger _logger = Logger.getLogger(AMQStateManager.class); - private final QueueRegistry _queueRegistry; - private final ExchangeRegistry _exchangeRegistry; + + private final VirtualHostRegistry _virtualHostRegistry; private final AMQProtocolSession _protocolSession; /** * The current state @@ -63,15 +64,15 @@ public class AMQStateManager implements AMQMethodListener private CopyOnWriteArraySet<StateListener> _stateListeners = new CopyOnWriteArraySet<StateListener>(); - public AMQStateManager(QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession) + + public AMQStateManager(VirtualHostRegistry virtualHostRegistry, AMQProtocolSession protocolSession) { - this(AMQState.CONNECTION_NOT_STARTED, true, queueRegistry, exchangeRegistry, protocolSession); + this(AMQState.CONNECTION_NOT_STARTED, true, virtualHostRegistry, protocolSession); } - protected AMQStateManager(AMQState initial, boolean register, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession) + protected AMQStateManager(AMQState initial, boolean register, VirtualHostRegistry virtualHostRegistry, AMQProtocolSession protocolSession) { - _queueRegistry = queueRegistry; - _exchangeRegistry = exchangeRegistry; + _virtualHostRegistry = virtualHostRegistry; _protocolSession = protocolSession; _currentState = initial; if (register) @@ -176,7 +177,7 @@ public class AMQStateManager implements AMQMethodListener checkChannel(evt, _protocolSession); - handler.methodReceived(this, _queueRegistry, _exchangeRegistry, _protocolSession, evt); + handler.methodReceived(this, evt); return true; } return false; @@ -241,4 +242,14 @@ public class AMQStateManager implements AMQMethodListener { _stateListeners.remove(listener); } + + public VirtualHostRegistry getVirtualHostRegistry() + { + return _virtualHostRegistry; + } + + public AMQProtocolSession getProtocolSession() + { + return _protocolSession; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java b/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java index 56323258b7..99d5d7fe88 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java +++ b/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java @@ -25,6 +25,7 @@ import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; import org.apache.qpid.framing.AMQMethodBody; /** @@ -34,7 +35,5 @@ import org.apache.qpid.framing.AMQMethodBody; */ public interface StateAwareMethodListener <B extends AMQMethodBody> { - void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, - AMQMethodEvent<B> evt) throws AMQException; + void methodReceived(AMQStateManager stateManager, AMQMethodEvent<B> evt) throws AMQException; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index 98a4c3f6e7..eaaffa2dce 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -28,6 +28,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MessageMetaData; import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.ArrayList; import java.util.List; @@ -67,7 +68,7 @@ public class MemoryMessageStore implements MessageStore _contentBodyMap = new ConcurrentHashMap<Long, List<ContentBody>>(hashtableCapacity); } - public void configure(QueueRegistry queueRegistry, String base, Configuration config) throws Exception + public void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception { configure(base, config); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java index c9c7045402..8daad0e5e5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java @@ -27,6 +27,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MessageMetaData; import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.List; @@ -35,13 +36,13 @@ public interface MessageStore /** * Called after instantiation in order to configure the message store. A particular implementation can define * whatever parameters it wants. - * @param queueRegistry the registry of queues to be used by this store + * @param virtualHost the virtual host using by this store * @param base the base element identifier from which all configuration items are relative. For example, if the base * element is "store", the all elements used by concrete classes will be "store.foo" etc. * @param config the apache commons configuration object * @throws Exception if an error occurs that means the store is unable to configure itself */ - void configure(QueueRegistry queueRegistry, String base, Configuration config) throws Exception; + void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception; /** * Called to close and cleanup any resources used by the message store. diff --git a/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java index 2e77f33363..e9a3a3d048 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java @@ -33,24 +33,23 @@ import org.apache.qpid.server.security.auth.AuthenticationManager; import org.apache.qpid.server.security.auth.NullAuthenticationManager; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MemoryMessageStore; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.MapConfiguration; import java.util.HashMap; +import java.util.Collection; +import java.util.Collections; +import java.util.Arrays; public class NullApplicationRegistry extends ApplicationRegistry { - private QueueRegistry _queueRegistry; - - private ExchangeRegistry _exchangeRegistry; - - private ExchangeFactory _exchangeFactory; - private ManagedObjectRegistry _managedObjectRegistry; private AuthenticationManager _authenticationManager; - private MessageStore _messageStore; + private VirtualHostRegistry _virtualHostRegistry; public NullApplicationRegistry() @@ -60,15 +59,16 @@ public class NullApplicationRegistry extends ApplicationRegistry public void initialise() throws Exception { + _configuration.addProperty("store.class","org.apache.qpid.server.store.MemoryMessageStore"); + _managedObjectRegistry = new NoopManagedObjectRegistry(); - _queueRegistry = new DefaultQueueRegistry(); - _exchangeFactory = new DefaultExchangeFactory(); - _exchangeRegistry = new DefaultExchangeRegistry(_exchangeFactory); + _virtualHostRegistry = new VirtualHostRegistry(); + VirtualHost dummyHost = new VirtualHost("test",getConfiguration()); + _virtualHostRegistry.registerVirtualHost(dummyHost); _authenticationManager = new NullAuthenticationManager(); - _messageStore = new MemoryMessageStore(); - ((MemoryMessageStore)_messageStore).configure(); _configuration.addProperty("heartbeat.delay", 10 * 60); // 10 minutes + } public Configuration getConfiguration() @@ -76,20 +76,6 @@ public class NullApplicationRegistry extends ApplicationRegistry return _configuration; } - public QueueRegistry getQueueRegistry() - { - return _queueRegistry; - } - - public ExchangeRegistry getExchangeRegistry() - { - return _exchangeRegistry; - } - - public ExchangeFactory getExchangeFactory() - { - return _exchangeFactory; - } public ManagedObjectRegistry getManagedObjectRegistry() { @@ -101,9 +87,15 @@ public class NullApplicationRegistry extends ApplicationRegistry return _authenticationManager; } - public MessageStore getMessageStore() + public Collection<String> getVirtualHostNames() + { + String[] hosts = {"test"}; + return Arrays.asList( hosts ); + } + + public VirtualHostRegistry getVirtualHostRegistry() { - return _messageStore; + return _virtualHostRegistry; } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ManagedVirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ManagedVirtualHost.java new file mode 100644 index 0000000000..2c888caac1 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ManagedVirtualHost.java @@ -0,0 +1,51 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import org.apache.qpid.server.management.MBeanAttribute;
+import org.apache.qpid.server.management.MBeanOperation;
+import org.apache.qpid.server.management.MBeanOperationParameter;
+import org.apache.qpid.server.queue.ManagedQueue;
+import org.apache.qpid.server.exchange.ManagedExchange;
+
+import javax.management.openmbean.TabularData;
+import javax.management.JMException;
+import javax.management.MBeanOperationInfo;
+import java.io.IOException;
+
+/**
+ * The management interface exposed to allow management of an Exchange.
+ * @version 0.1
+ */
+public interface ManagedVirtualHost
+{
+ static final String TYPE = "VirtualHost";
+
+ /**
+ * Returns the name of the managed virtualHost.
+ * @return the name of the exchange.
+ * @throws java.io.IOException
+ */
+ @MBeanAttribute(name="Name", description= TYPE + " Name")
+ String getName() throws IOException;
+
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java new file mode 100644 index 0000000000..15bad19e58 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -0,0 +1,193 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.DefaultQueueRegistry;
+import org.apache.qpid.server.exchange.*;
+import org.apache.qpid.server.management.*;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.configuration.Configurator;
+import org.apache.qpid.server.AMQBrokerManagerMBean;
+import org.apache.commons.configuration.Configuration;
+import org.apache.log4j.Logger;
+
+import javax.management.NotCompliantMBeanException;
+
+public class VirtualHost
+{
+ private static final Logger _logger = Logger.getLogger(VirtualHost.class);
+
+
+ private final String _name;
+
+ private QueueRegistry _queueRegistry;
+
+ private ExchangeRegistry _exchangeRegistry;
+
+ private ExchangeFactory _exchangeFactory;
+
+ private MessageStore _messageStore;
+
+ protected VirtualHostMBean _virtualHostMBean;
+
+ private AMQBrokerManagerMBean _brokerMBean;
+
+
+ /**
+ * Abstract MBean class. This has some of the methods implemented from
+ * management intrerface for exchanges. Any implementaion of an
+ * Exchange MBean should extend this class.
+ */
+ public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtualHost
+ {
+ public VirtualHostMBean() throws NotCompliantMBeanException
+ {
+ super(ManagedVirtualHost.class, "VirtualHost");
+ }
+
+
+ public String getObjectInstanceName()
+ {
+ return _name.toString();
+ }
+
+ public String getName()
+ {
+ return _name.toString();
+ }
+
+ public VirtualHost getVirtualHost()
+ {
+ return VirtualHost.this;
+ }
+
+
+ } // End of MBean class
+
+
+
+ public VirtualHost(String name, Configuration hostConfig) throws Exception
+ {
+ _name = name;
+
+ _virtualHostMBean = new VirtualHostMBean();
+ _virtualHostMBean.register();
+
+ _queueRegistry = new DefaultQueueRegistry(this);
+ _exchangeFactory = new DefaultExchangeFactory(this);
+ _exchangeRegistry = new DefaultExchangeRegistry(_exchangeFactory);
+
+ initialiseMessageStore(hostConfig);
+
+ _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
+ _brokerMBean.register();
+
+ }
+
+ private void initialiseMessageStore(Configuration config) throws Exception
+ {
+ String messageStoreClass = config.getString("store.class");
+
+ Class clazz = Class.forName(messageStoreClass);
+ Object o = clazz.newInstance();
+
+ if (!(o instanceof MessageStore))
+ {
+ throw new ClassCastException("Message store class must implement " + MessageStore.class + ". Class " + clazz +
+ " does not.");
+ }
+ _messageStore = (MessageStore) o;
+ _messageStore.configure(this, "store", config);
+ }
+
+
+
+
+ public <T> T getConfiguredObject(Class<T> instanceType, Configuration config)
+ {
+ T instance;
+ try
+ {
+ instance = instanceType.newInstance();
+ }
+ catch (Exception e)
+ {
+ _logger.error("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor");
+ throw new IllegalArgumentException("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor");
+ }
+ Configurator.configure(instance);
+
+ return instance;
+ }
+
+
+ public String getName()
+ {
+ return _name;
+ }
+
+ public QueueRegistry getQueueRegistry()
+ {
+ return _queueRegistry;
+ }
+
+ public ExchangeRegistry getExchangeRegistry()
+ {
+ return _exchangeRegistry;
+ }
+
+ public ExchangeFactory getExchangeFactory()
+ {
+ return _exchangeFactory;
+ }
+
+ public ApplicationRegistry getApplicationRegistry()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public MessageStore getMessageStore()
+ {
+ return _messageStore;
+ }
+
+ public void close() throws Exception
+ {
+ if(_messageStore != null)
+ {
+ _messageStore.close();
+ }
+ }
+
+ public ManagedObject getBrokerMBean()
+ {
+ return _brokerMBean;
+ }
+
+
+
+ public ManagedObject getManagedObject()
+ {
+ return _virtualHostMBean;
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java new file mode 100644 index 0000000000..25f67c1cf3 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java @@ -0,0 +1,52 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import java.util.Map;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.ArrayList;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+public class VirtualHostRegistry
+{
+ private final Map<String, VirtualHost> _registry = new ConcurrentHashMap<String,VirtualHost>();
+
+ public synchronized void registerVirtualHost(VirtualHost host) throws Exception
+ {
+ if(_registry.containsKey(host.getName()))
+ {
+ throw new Exception("Virtual Host with name " + host.getName() + " already registered.");
+ }
+ _registry.put(host.getName(),host);
+ }
+
+ public VirtualHost getVirtualHost(String name)
+ {
+ return _registry.get(name);
+ }
+
+ public Collection<VirtualHost> getVirtualHosts()
+ {
+ return new ArrayList<VirtualHost>(_registry.values());
+ }
+}
|