summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2007-01-24 15:41:48 +0000
committerRobert Greig <rgreig@apache.org>2007-01-24 15:41:48 +0000
commitb89531eed28cf2eee4fd841be57d27c0d5bcf744 (patch)
tree3e86825d875a7cae7943ae17e0350cee5aec0bc5 /java/broker/src/main/java/org/apache/qpid/server
parent7c43996f3c10426d6593b7224486a6b0331c7259 (diff)
downloadqpid-python-b89531eed28cf2eee4fd841be57d27c0d5bcf744.tar.gz
QPID-50 : Patch supplied by Rob Godfrey - Virtual Host implementation
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@499446 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java46
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/Main.java25
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/configuration/Configurator.java19
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java21
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java39
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java11
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java11
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java15
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java13
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java10
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java9
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java19
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java11
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java45
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java17
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java19
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java14
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java15
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java11
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java15
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java63
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java19
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java9
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java12
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java13
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java11
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java10
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java53
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java31
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPProtocolProvider.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java17
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java107
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java22
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java11
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java32
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java66
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java18
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java27
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java48
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/virtualhost/ManagedVirtualHost.java51
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java193
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java52
59 files changed, 911 insertions, 423 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
index 6f93a14469..2e6293081d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
@@ -30,6 +30,7 @@ import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
@@ -50,20 +51,28 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr
private final ExchangeFactory _exchangeFactory;
private final MessageStore _messageStore;
+ private final VirtualHost.VirtualHostMBean _virtualHostMBean;
+
@MBeanConstructor("Creates the Broker Manager MBean")
- public AMQBrokerManagerMBean() throws JMException
+ public AMQBrokerManagerMBean(VirtualHost.VirtualHostMBean virtualHostMBean) throws JMException
{
super(ManagedBroker.class, ManagedBroker.TYPE);
- IApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
- _queueRegistry = appRegistry.getQueueRegistry();
- _exchangeRegistry = appRegistry.getExchangeRegistry();
- _exchangeFactory = ApplicationRegistry.getInstance().getExchangeFactory();
- _messageStore = ApplicationRegistry.getInstance().getMessageStore();
+
+ _virtualHostMBean = virtualHostMBean;
+ VirtualHost virtualHost = virtualHostMBean.getVirtualHost();
+
+
+
+
+ _queueRegistry = virtualHost.getQueueRegistry();
+ _exchangeRegistry = virtualHost.getExchangeRegistry();
+ _messageStore = virtualHost.getMessageStore();
+ _exchangeFactory = virtualHost.getExchangeFactory();
}
public String getObjectInstanceName()
{
- return this.getClass().getName();
+ return _virtualHostMBean.getVirtualHost().getName();
}
/**
@@ -144,7 +153,7 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr
try
{
- queue = new AMQQueue(new AMQShortString(queueName), durable, new AMQShortString(owner), autoDelete, _queueRegistry);
+ queue = new AMQQueue(new AMQShortString(queueName), durable, new AMQShortString(owner), autoDelete, getVirtualHost());
if (queue.isDurable() && !queue.isAutoDelete())
{
_messageStore.createQueue(queue);
@@ -157,6 +166,11 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr
}
}
+ private VirtualHost getVirtualHost()
+ {
+ return _virtualHostMBean.getVirtualHost();
+ }
+
/**
* Deletes the queue from queue registry and persistant storage.
*
@@ -183,11 +197,17 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr
}
}
- public ObjectName getObjectName() throws MalformedObjectNameException
+ public ManagedObject getParentObject()
{
- StringBuffer objectName = new StringBuffer(ManagedObject.DOMAIN);
- objectName.append(":type=").append(getType());
-
- return new ObjectName(objectName.toString());
+ return _virtualHostMBean;
}
+
+// public ObjectName getObjectName() throws MalformedObjectNameException
+// {
+// StringBuffer objectName = new StringBuffer(ManagedObject.DOMAIN);
+// objectName.append(".").append(getVirtualHost().getName());
+// objectName.append(":type=").append(getType());
+//
+// return new ObjectName(objectName.toString());
+// }
} // End of MBean class
diff --git a/java/broker/src/main/java/org/apache/qpid/server/Main.java b/java/broker/src/main/java/org/apache/qpid/server/Main.java
index ffd25de0b4..c45d1ad2c2 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/Main.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/Main.java
@@ -62,7 +62,7 @@ import java.util.StringTokenizer;
* Main entry point for AMQPD.
*
*/
-public class Main implements ProtocolVersionList, Managable
+public class Main implements ProtocolVersionList
{
private static final Logger _logger = Logger.getLogger(Main.class);
@@ -70,7 +70,8 @@ public class Main implements ProtocolVersionList, Managable
private static final String DEFAULT_LOG_CONFIG_FILENAME = "log4j.xml";
- private AMQBrokerManagerMBean _mbean = null;
+
+ private static Main _instance;
protected static class InitException extends Exception
{
@@ -265,7 +266,6 @@ public class Main implements ProtocolVersionList, Managable
}
bind(port, connectorConfig);
- createAndRegisterBrokerMBean();
}
protected void setupVirtualHosts(String configFileParent, String configFilePath) throws ConfigurationException, AMQException, URLSyntaxException
@@ -368,7 +368,7 @@ public class Main implements ProtocolVersionList, Managable
public static void main(String[] args)
{
- new Main(args);
+ _instance = new Main(args);
}
private byte[] parseIP(String address) throws Exception
@@ -430,21 +430,4 @@ public class Main implements ProtocolVersionList, Managable
}
}
- private void createAndRegisterBrokerMBean() throws AMQException
- {
- try
- {
- _mbean = new AMQBrokerManagerMBean();
- _mbean.register();
- }
- catch (JMException ex)
- {
- throw new AMQException("Exception occured in creating AMQBrokerManager MBean");
- }
- }
-
- public ManagedObject getManagedObject()
- {
- return _mbean;
- }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/Configurator.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/Configurator.java
index 5e3ac03ba7..379da94aa3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/configuration/Configurator.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/Configurator.java
@@ -37,14 +37,15 @@ public class Configurator
{
private static final Logger _logger = Logger.getLogger(Configurator.class);
+
/**
- * Configure a given instance using the application configuration. Note that superclasses are <b>not</b>
+ * Configure a given instance using the supplied configuration. Note that superclasses are <b>not</b>
* currently configured but this could easily be added if required.
* @param instance the instance to configure
+ * @param config the configuration to use to configure the object
*/
- public static void configure(Object instance)
+ public static void configure(Object instance, Configuration config)
{
- final Configuration config = ApplicationRegistry.getInstance().getConfiguration();
for (Field f : instance.getClass().getDeclaredFields())
{
@@ -56,6 +57,18 @@ public class Configurator
}
}
+
+
+ /**
+ * Configure a given instance using the application configuration. Note that superclasses are <b>not</b>
+ * currently configured but this could easily be added if required.
+ * @param instance the instance to configure
+ */
+ public static void configure(Object instance)
+ {
+ configure(instance, ApplicationRegistry.getInstance().getConfiguration());
+ }
+
private static void setValueInField(Field f, Object instance, Configuration config, Configured annotation)
{
Class fieldClass = f.getType();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
index 7e807304c8..361a21b284 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
@@ -28,6 +28,7 @@ import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.log4j.Logger;
@@ -113,6 +114,7 @@ public class VirtualHostConfiguration
}
_logger.info("VirtualHost:'" + prop + "'");
+ String virtualHost = prop.toString();
prop = _config.getProperty(path + "." + XML_BIND);
if (prop instanceof Collection)
@@ -121,16 +123,16 @@ public class VirtualHostConfiguration
_logger.debug("Number of Bindings: " + bindings);
for (int dest = 0; dest < bindings; dest++)
{
- loadBinding(path, dest);
+ loadBinding(virtualHost, path, dest);
}
}
else
{
- loadBinding(path, -1);
+ loadBinding(virtualHost,path, -1);
}
}
- private void loadBinding(String rootpath, int index) throws AMQException, ConfigurationException, URLSyntaxException
+ private void loadBinding(String virtualHost, String rootpath, int index) throws AMQException, ConfigurationException, URLSyntaxException
{
String path = rootpath + "." + XML_BIND;
if (index != -1)
@@ -146,7 +148,7 @@ public class VirtualHostConfiguration
try
{
- bind(binding);
+ bind(virtualHost, binding);
}
catch (AMQException amqe)
{
@@ -155,7 +157,7 @@ public class VirtualHostConfiguration
}
}
- private void bind(AMQBindingURL binding) throws AMQException, ConfigurationException
+ private void bind(String virtualHostName, AMQBindingURL binding) throws AMQException, ConfigurationException
{
AMQShortString queueName = binding.getQueueName();
@@ -169,9 +171,10 @@ public class VirtualHostConfiguration
}
//Get references to Broker Registries
- QueueRegistry queueRegistry = ApplicationRegistry.getInstance().getQueueRegistry();
- MessageStore messageStore = ApplicationRegistry.getInstance().getMessageStore();
- ExchangeRegistry exchangeRegistry = ApplicationRegistry.getInstance().getExchangeRegistry();
+ VirtualHost virtualHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost(virtualHostName);
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+ MessageStore messageStore = virtualHost.getMessageStore();
+ ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
synchronized (queueRegistry)
{
@@ -184,7 +187,7 @@ public class VirtualHostConfiguration
queue = new AMQQueue(queueName,
Boolean.parseBoolean(binding.getOption(AMQBindingURL.OPTION_DURABLE)),
null /* These queues will have no owner */,
- false /* Therefore autodelete makes no sence */, queueRegistry);
+ false /* Therefore autodelete makes no sence */, virtualHost);
if (queue.isDurable())
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index 94c792c358..caafb83568 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -25,6 +25,10 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.management.ManagedObjectRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
import javax.management.MalformedObjectNameException;
import javax.management.NotCompliantMBeanException;
@@ -34,10 +38,14 @@ public abstract class AbstractExchange implements Exchange, Managable
{
private AMQShortString _name;
+
+
protected boolean _durable;
protected String _exchangeType;
protected int _ticket;
+ private VirtualHost _virtualHost;
+
protected ExchangeMBean _exchangeMbean;
/**
@@ -57,6 +65,11 @@ public abstract class AbstractExchange implements Exchange, Managable
super(ManagedExchange.class, ManagedExchange.TYPE);
}
+ public ManagedObject getParentObject()
+ {
+ return _virtualHost.getManagedObject();
+ }
+
public String getObjectInstanceName()
{
return _name.toString();
@@ -87,13 +100,17 @@ public abstract class AbstractExchange implements Exchange, Managable
return _autoDelete;
}
- public ObjectName getObjectName() throws MalformedObjectNameException
+// public ObjectName getObjectName() throws MalformedObjectNameException
+// {
+// String objNameString = super.getObjectName().toString();
+// objNameString = objNameString + ",VirtualHost="+ _virtualHost.getName() +",ExchangeType=" + _exchangeType;
+// return new ObjectName(objNameString);
+// }
+
+ protected ManagedObjectRegistry getManagedObjectRegistry()
{
- String objNameString = super.getObjectName().toString();
- objNameString = objNameString + ",ExchangeType=" + _exchangeType;
- return new ObjectName(objNameString);
+ return ApplicationRegistry.getInstance().getManagedObjectRegistry();
}
-
} // End of MBean class
public AMQShortString getName()
@@ -108,8 +125,9 @@ public abstract class AbstractExchange implements Exchange, Managable
*/
protected abstract ExchangeMBean createMBean() throws AMQException;
- public void initialise(AMQShortString name, boolean durable, int ticket, boolean autoDelete) throws AMQException
+ public void initialise(VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete) throws AMQException
{
+ _virtualHost = host;
_name = name;
_durable = durable;
_autoDelete = autoDelete;
@@ -151,4 +169,13 @@ public abstract class AbstractExchange implements Exchange, Managable
return _exchangeMbean;
}
+ public VirtualHost getVirtualHost()
+ {
+ return _virtualHost;
+ }
+
+ public QueueRegistry getQueueRegistry()
+ {
+ return getVirtualHost().getQueueRegistry();
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
index 77f9819048..d77f1b6c5a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
@@ -24,6 +24,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQUnknownExchangeType;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
@@ -36,9 +37,11 @@ public class DefaultExchangeFactory implements ExchangeFactory
private static final Logger _logger = Logger.getLogger(DefaultExchangeFactory.class);
private Map<AMQShortString, Class<? extends Exchange>> _exchangeClassMap = new HashMap<AMQShortString, Class<? extends Exchange>>();
+ private final VirtualHost _host;
- public DefaultExchangeFactory()
+ public DefaultExchangeFactory(VirtualHost host)
{
+ _host = host;
_exchangeClassMap.put(ExchangeDefaults.DIRECT_EXCHANGE_CLASS, org.apache.qpid.server.exchange.DestNameExchange.class);
_exchangeClassMap.put(ExchangeDefaults.TOPIC_EXCHANGE_CLASS, org.apache.qpid.server.exchange.DestWildExchange.class);
_exchangeClassMap.put(ExchangeDefaults.HEADERS_EXCHANGE_CLASS, org.apache.qpid.server.exchange.HeadersExchange.class);
@@ -59,7 +62,7 @@ public class DefaultExchangeFactory implements ExchangeFactory
try
{
Exchange e = exchClass.newInstance();
- e.initialise(exchange, durable, ticket, autoDelete);
+ e.initialise(_host, exchange, durable, ticket, autoDelete);
return e;
}
catch (InstantiationException e)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
index fcd6e8fdad..8862bd5104 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
@@ -109,7 +109,7 @@ public class DestNameExchange extends AbstractExchange
public void createNewBinding(String queueName, String binding) throws JMException
{
- AMQQueue queue = ApplicationRegistry.getInstance().getQueueRegistry().getQueue(new AMQShortString(queueName));
+ AMQQueue queue = getQueueRegistry().getQueue(new AMQShortString(queueName));
if (queue == null)
{
throw new JMException("Queue \"" + queueName + "\" is not registered with the exchange.");
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
index d1b35451b5..c38b9fe9b4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
@@ -31,6 +31,7 @@ import org.apache.qpid.server.management.MBeanDescription;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import javax.management.JMException;
import javax.management.MBeanException;
@@ -110,7 +111,7 @@ public class DestWildExchange extends AbstractExchange
public void createNewBinding(String queueName, String binding) throws JMException
{
- AMQQueue queue = ApplicationRegistry.getInstance().getQueueRegistry().getQueue(new AMQShortString(queueName));
+ AMQQueue queue = getQueueRegistry().getQueue(new AMQShortString(queueName));
if (queue == null)
throw new JMException("Queue \"" + queueName + "\" is not registered with the exchange.");
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
index 366dcb11b3..c012a1c1c9 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
@@ -25,13 +25,14 @@ import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.virtualhost.VirtualHost;
public interface Exchange
{
AMQShortString getName();
AMQShortString getType();
- void initialise(AMQShortString name, boolean durable, int ticket, boolean autoDelete) throws AMQException;
+ void initialise(VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete) throws AMQException;
boolean isDurable();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
index 2e7457e4a6..82039c345f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
@@ -44,7 +44,7 @@ public class FanoutExchange extends AbstractExchange
private TabularType _bindinglistDataType = null;
private TabularDataSupport _bindingList = null;
- @MBeanConstructor("Creates an MBean for AMQ direct exchange")
+ @MBeanConstructor("Creates an MBean for AMQ fanout exchange")
public FanoutExchangeMBean() throws JMException
{
super();
@@ -86,7 +86,7 @@ public class FanoutExchange extends AbstractExchange
public void createNewBinding(String queueName, String binding) throws JMException
{
- AMQQueue queue = ApplicationRegistry.getInstance().getQueueRegistry().getQueue(new AMQShortString(queueName));
+ AMQQueue queue = getQueueRegistry().getQueue(new AMQShortString(queueName));
if (queue == null)
{
throw new JMException("Queue \"" + queueName + "\" is not registered with the exchange.");
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
index 93933cd88d..3a49ff586b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
@@ -150,7 +150,7 @@ public class HeadersExchange extends AbstractExchange
*/
public void createNewBinding(String queueName, String binding) throws JMException
{
- AMQQueue queue = ApplicationRegistry.getInstance().getQueueRegistry().getQueue(new AMQShortString(queueName));
+ AMQQueue queue = getQueueRegistry().getQueue(new AMQShortString(queueName));
if (queue == null)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java
index e12dd4a9db..b2b1b0a716 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java
@@ -29,6 +29,7 @@ import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.log4j.Logger;
public class BasicAckMethodHandler implements StateAwareMethodListener<BasicAckBody>
@@ -46,10 +47,10 @@ public class BasicAckMethodHandler implements StateAwareMethodListener<BasicAckB
{
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<BasicAckBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicAckBody> evt) throws AMQException
{
+ AMQProtocolSession protocolSession = stateManager.getProtocolSession();
+
if (_log.isDebugEnabled())
{
_log.debug("Ack received on channel " + evt.getChannelId());
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java
index a23a29941f..2bbb696e90 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java
@@ -30,6 +30,7 @@ import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.qpid.AMQException;
public class BasicCancelMethodHandler implements StateAwareMethodListener<BasicCancelBody>
@@ -45,10 +46,10 @@ public class BasicCancelMethodHandler implements StateAwareMethodListener<BasicC
{
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<BasicCancelBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicCancelBody> evt) throws AMQException
{
+ AMQProtocolSession protocolSession = stateManager.getProtocolSession();
+
final AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
final BasicCancelBody body = evt.getMethod();
channel.unsubscribeConsumer(protocolSession, body.consumerTag);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
index 721001b454..bc695431c9 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
@@ -28,6 +28,8 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.framing.*;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.ConsumerTagNotUniqueException;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -53,14 +55,15 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
{
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession session,
- AMQMethodEvent<BasicConsumeBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
+
BasicConsumeBody body = evt.getMethod();
final int channelId = evt.getChannelId();
AMQChannel channel = session.getChannel(channelId);
+ VirtualHost vHost = session.getVirtualHost();
if (channel == null)
{
_log.error("Channel " + channelId + " not found");
@@ -69,7 +72,7 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
else
{
- AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : queueRegistry.getQueue(body.queue);
+ AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : vHost.getQueueRegistry().getQueue(body.queue);
if (queue == null)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
index 1eb3152973..51b585ecc5 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
@@ -10,6 +10,8 @@ import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.ConsumerTagNotUniqueException;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInvalidSelectorException;
import org.apache.qpid.protocol.AMQConstant;
@@ -31,12 +33,13 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
{
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession session,
- AMQMethodEvent<BasicGetBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicGetBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
+
BasicGetBody body = evt.getMethod();
final int channelId = evt.getChannelId();
+ VirtualHost vHost = session.getVirtualHost();
AMQChannel channel = session.getChannel(channelId);
if (channel == null)
@@ -46,7 +49,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
}
else
{
- AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : queueRegistry.getQueue(body.queue);
+ AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : vHost.getQueueRegistry().getQueue(body.queue);
if (queue == null)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
index 0ef30be265..78a246d80e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
@@ -28,6 +28,8 @@ import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ChannelCloseBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.protocol.AMQMethodEvent;
@@ -53,10 +55,10 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basi
{
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<BasicPublishBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicPublishBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
+
final BasicPublishBody body = evt.getMethod();
if (_log.isDebugEnabled())
@@ -70,7 +72,8 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basi
body.exchange = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
}
- Exchange e = exchangeRegistry.getExchange(body.exchange);
+ VirtualHost vHost = session.getVirtualHost();
+ Exchange e = vHost.getExchangeRegistry().getExchange(body.exchange);
// if the exchange does not exist we raise a channel exception
if (e == null)
{
@@ -82,8 +85,8 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basi
// The partially populated BasicDeliver frame plus the received route body
// is stored in the channel. Once the final body frame has been received
// it is routed to the exchange.
- AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
- channel.setPublishFrame(body, protocolSession);
+ AMQChannel channel = session.getChannel(evt.getChannelId());
+ channel.setPublishFrame(body, session);
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java
index 2bab4cac5c..325e5226ad 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java
@@ -28,6 +28,7 @@ import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.AMQException;
@@ -40,9 +41,9 @@ public class BasicQosHandler implements StateAwareMethodListener<BasicQosBody>
return _instance;
}
- public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges,
- AMQProtocolSession session, AMQMethodEvent<BasicQosBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicQosBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
session.getChannel(evt.getChannelId()).setPrefetchCount(evt.getMethod().prefetchCount);
session.getChannel(evt.getChannelId()).setPrefetchSize(evt.getMethod().prefetchSize);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
index f3e0cc3a63..a247ee33ce 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
@@ -27,6 +27,7 @@ import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.qpid.framing.BasicRecoverBody;
import org.apache.qpid.AMQException;
import org.apache.log4j.Logger;
@@ -42,18 +43,18 @@ public class BasicRecoverMethodHandler implements StateAwareMethodListener<Basic
return _instance;
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<BasicRecoverBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicRecoverBody> evt) throws AMQException
{
- _logger.debug("Recover received on protocol session " + protocolSession + " and channel " + evt.getChannelId());
- AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
+ AMQProtocolSession session = stateManager.getProtocolSession();
+
+ _logger.debug("Recover received on protocol session " + session + " and channel " + evt.getChannelId());
+ AMQChannel channel = session.getChannel(evt.getChannelId());
if (channel == null)
{
throw new AMQException("Unknown channel " + evt.getChannelId());
}
BasicRecoverBody body = evt.getMethod();
- channel.resend(protocolSession, body.requeue);
+ channel.resend(session, body.requeue);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
index a24ecd9b01..d46a4f6424 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
@@ -31,6 +31,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
public class ChannelCloseHandler implements StateAwareMethodListener<ChannelCloseBody>
{
@@ -47,18 +48,17 @@ public class ChannelCloseHandler implements StateAwareMethodListener<ChannelClos
{
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<ChannelCloseBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ChannelCloseBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
ChannelCloseBody body = evt.getMethod();
_logger.info("Received channel close for id " + evt.getChannelId() + " citing class " + body.classId +
" and method " + body.methodId);
- protocolSession.closeChannel(evt.getChannelId());
+ session.closeChannel(evt.getChannelId());
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
AMQFrame response = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
- protocolSession.writeFrame(response);
+ session.writeFrame(response);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java
index 81a5371829..be11d5e939 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java
@@ -28,6 +28,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.log4j.Logger;
public class ChannelCloseOkHandler implements StateAwareMethodListener<ChannelCloseOkBody>
@@ -45,9 +46,7 @@ public class ChannelCloseOkHandler implements StateAwareMethodListener<ChannelCl
{
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<ChannelCloseOkBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ChannelCloseOkBody> evt) throws AMQException
{
_logger.info("Received channel-close-ok for channel-id " + evt.getChannelId());
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
index d92a4eed6a..62f7ed4b78 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
@@ -48,13 +48,12 @@ public class ChannelFlowHandler implements StateAwareMethodListener<ChannelFlowB
{
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<ChannelFlowBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ChannelFlowBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
ChannelFlowBody body = evt.getMethod();
- AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
+ AMQChannel channel = session.getChannel(evt.getChannelId());
channel.setSuspended(!body.active);
_logger.debug("Channel.Flow for channel " + evt.getChannelId() + ", active=" + body.active);
@@ -64,6 +63,6 @@ public class ChannelFlowHandler implements StateAwareMethodListener<ChannelFlowB
AMQFrame response = ChannelFlowOkBody.createAMQFrame(evt.getChannelId(),
(byte)8, (byte)0, // AMQP version (major, minor)
body.active); // active
- protocolSession.writeFrame(response);
+ session.writeFrame(response);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
index 6ea9dfa595..5cd3f8ac89 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
@@ -25,6 +25,7 @@ import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.ChannelOpenBody;
import org.apache.qpid.framing.ChannelOpenOkBody;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.exchange.ExchangeRegistry;
@@ -47,18 +48,18 @@ public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenB
{
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<ChannelOpenBody> evt) throws AMQException
- {
- IApplicationRegistry registry = ApplicationRegistry.getInstance();
- final AMQChannel channel = new AMQChannel(evt.getChannelId(), registry.getMessageStore(),
- exchangeRegistry);
- protocolSession.addChannel(channel);
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ChannelOpenBody> evt) throws AMQException
+ {
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ VirtualHost virtualHost = session.getVirtualHost();
+
+ final AMQChannel channel = new AMQChannel(evt.getChannelId(), virtualHost.getMessageStore(),
+ virtualHost.getExchangeRegistry());
+ session.addChannel(channel);
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
AMQFrame response = ChannelOpenOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
- protocolSession.writeFrame(response);
+ session.writeFrame(response);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
index 0fe25a1c89..8bc849d5cb 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
@@ -47,16 +47,15 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener<C
{
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<ConnectionCloseBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ConnectionCloseBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
final ConnectionCloseBody body = evt.getMethod();
_logger.info("ConnectionClose received with reply code/reply text " + body.replyCode + "/" +
- body.replyText + " for " + protocolSession);
+ body.replyText + " for " + session);
try
{
- protocolSession.closeSession();
+ session.closeSession();
}
catch (Exception e)
{
@@ -66,6 +65,6 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener<C
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
final AMQFrame response = ConnectionCloseOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
- protocolSession.writeFrame(response);
+ session.writeFrame(response);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java
index bcdd86d2ef..c10a731cc5 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java
@@ -46,17 +46,16 @@ public class ConnectionCloseOkMethodHandler implements StateAwareMethodListener<
{
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<ConnectionCloseOkBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ConnectionCloseOkBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
//todo should this not do more than just log the method?
_logger.info("Received Connection-close-ok");
try
{
stateManager.changeState(AMQState.CONNECTION_CLOSED);
- protocolSession.closeSession();
+ session.closeSession();
}
catch (Exception e)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
index 8056ff9adb..88717c446b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
@@ -27,11 +27,13 @@ import org.apache.qpid.framing.ConnectionOpenOkBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQState;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHost;
public class ConnectionOpenMethodHandler implements StateAwareMethodListener<ConnectionOpenBody>
{
@@ -51,28 +53,39 @@ public class ConnectionOpenMethodHandler implements StateAwareMethodListener<Con
return new AMQShortString(Long.toString(System.currentTimeMillis()));
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<ConnectionOpenBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ConnectionOpenBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
ConnectionOpenBody body = evt.getMethod();
-
+ String virtualHostName = String.valueOf(body.virtualHost);
- //todo //FIXME The virtual host must be validated by the server for the connection to open-ok
- // See Spec (0.8.2). Section 3.1.2 Virtual Hosts
- if (protocolSession.getContextKey() == null)
+ VirtualHost virtualHost = stateManager.getVirtualHostRegistry().getVirtualHost(virtualHostName);
+
+ if(virtualHost == null)
{
- protocolSession.setContextKey(generateClientID());
+ throw body.getConnectionException(AMQConstant.NOT_FOUND.getCode(), "Unknown virtual host: " + virtualHostName);
}
+ else
+ {
+ session.setVirtualHost( virtualHost );
+
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- AMQFrame response = ConnectionOpenOkBody.createAMQFrame((short)0,
- (byte)8, (byte)0, // AMQP version (major, minor)
- body.virtualHost); // knownHosts
- stateManager.changeState(AMQState.CONNECTION_OPEN);
- protocolSession.writeFrame(response);
+
+ // See Spec (0.8.2). Section 3.1.2 Virtual Hosts
+ if (session.getContextKey() == null)
+ {
+ session.setContextKey(generateClientID());
+ }
+
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame response = ConnectionOpenOkBody.createAMQFrame((short)0,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ body.virtualHost);
+ stateManager.changeState(AMQState.CONNECTION_OPEN);
+ session.writeFrame(response);
+ }
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
index d33874b727..11cbaade30 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
@@ -54,14 +54,13 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener
{
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<ConnectionSecureOkBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ConnectionSecureOkBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
ConnectionSecureOkBody body = evt.getMethod();
AuthenticationManager authMgr = ApplicationRegistry.getInstance().getAuthenticationManager();
- SaslServer ss = protocolSession.getSaslServer();
+ SaslServer ss = session.getSaslServer();
if (ss == null)
{
throw new AMQException("No SASL context set up in session");
@@ -84,8 +83,8 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener
ConnectionCloseBody.getMethod((byte)8, (byte)0), // methodId
AMQConstant.NOT_ALLOWED.getCode(), // replyCode
AMQConstant.NOT_ALLOWED.getName()); // replyText
- protocolSession.writeFrame(close);
- disposeSaslServer(protocolSession);
+ session.writeFrame(close);
+ disposeSaslServer(session);
break;
case SUCCESS:
_logger.info("Connected as: " + ss.getAuthorizationID());
@@ -101,8 +100,8 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener
Integer.MAX_VALUE, // channelMax
ConnectionStartOkMethodHandler.getConfiguredFrameSize(), // frameMax
HeartbeatConfig.getInstance().getDelay()); // heartbeat
- protocolSession.writeFrame(tune);
- disposeSaslServer(protocolSession);
+ session.writeFrame(tune);
+ disposeSaslServer(session);
break;
case CONTINUE:
stateManager.changeState(AMQState.CONNECTION_NOT_AUTH);
@@ -112,7 +111,7 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener
AMQFrame challenge = ConnectionSecureBody.createAMQFrame(0,
(byte)8, (byte)0, // AMQP version (major, minor)
authResult.challenge); // challenge
- protocolSession.writeFrame(challenge);
+ session.writeFrame(challenge);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
index 6cb384f081..b45a017166 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
@@ -60,10 +60,9 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener<
{
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<ConnectionStartOkBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ConnectionStartOkBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
final ConnectionStartOkBody body = evt.getMethod();
_logger.info("SASL Mechanism selected: " + body.mechanism);
_logger.info("Locale selected: " + body.locale);
@@ -73,15 +72,15 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener<
SaslServer ss = null;
try
{
- ss = authMgr.createSaslServer(String.valueOf(body.mechanism), protocolSession.getLocalFQDN());
- protocolSession.setSaslServer(ss);
+ ss = authMgr.createSaslServer(String.valueOf(body.mechanism), session.getLocalFQDN());
+ session.setSaslServer(ss);
AuthenticationResult authResult = authMgr.authenticate(ss, body.response);
//save clientProperties
- if (protocolSession.getClientProperties() == null)
+ if (session.getClientProperties() == null)
{
- protocolSession.setClientProperties(body.clientProperties);
+ session.setClientProperties(body.clientProperties);
}
switch (authResult.status)
@@ -100,7 +99,7 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener<
Integer.MAX_VALUE, // channelMax
getConfiguredFrameSize(), // frameMax
HeartbeatConfig.getInstance().getDelay()); // heartbeat
- protocolSession.writeFrame(tune);
+ session.writeFrame(tune);
break;
case CONTINUE:
stateManager.changeState(AMQState.CONNECTION_NOT_AUTH);
@@ -110,12 +109,12 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener<
AMQFrame challenge = ConnectionSecureBody.createAMQFrame(0,
(byte)8, (byte)0, // AMQP version (major, minor)
authResult.challenge); // challenge
- protocolSession.writeFrame(challenge);
+ session.writeFrame(challenge);
}
}
catch (SaslException e)
{
- disposeSaslServer(protocolSession);
+ disposeSaslServer(session);
throw new AMQException("SASL error: " + e, e);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java
index 960643325a..020e93b7d2 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java
@@ -42,16 +42,15 @@ public class ConnectionTuneOkMethodHandler implements StateAwareMethodListener<C
return _instance;
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<ConnectionTuneOkBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ConnectionTuneOkBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
ConnectionTuneOkBody body = evt.getMethod();
if (_logger.isDebugEnabled())
{
_logger.debug(body);
}
stateManager.changeState(AMQState.CONNECTION_NOT_OPENED);
- protocolSession.initHeartbeats(body.heartbeat);
+ session.initHeartbeats(body.heartbeat);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
index 596e6bf332..30da1398b3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
@@ -30,6 +30,7 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHost;
/**
* @author Apache Software Foundation
@@ -61,10 +62,12 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo
{
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<ExchangeBoundBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ExchangeBoundBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ VirtualHost virtualHost = session.getVirtualHost();
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
byte major = (byte)8;
@@ -79,7 +82,7 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo
{
throw new AMQException("Exchange exchange must not be null");
}
- Exchange exchange = exchangeRegistry.getExchange(exchangeName);
+ Exchange exchange = virtualHost.getExchangeRegistry().getExchange(exchangeName);
AMQFrame response;
if (exchange == null)
{
@@ -112,6 +115,7 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo
}
else
{
+
AMQQueue queue = queueRegistry.getQueue(queueName);
if (queue == null)
{
@@ -194,6 +198,6 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo
" to exchange " + exchangeName)); // replyText
}
}
- protocolSession.writeFrame(response);
+ session.writeFrame(response);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
index 84e9a4e3f4..03af56ff14 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
@@ -38,6 +38,7 @@ import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
public class ExchangeDeclareHandler implements StateAwareMethodListener<ExchangeDeclareBody>
{
@@ -50,17 +51,19 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange
return _instance;
}
- private final ExchangeFactory exchangeFactory;
+
private ExchangeDeclareHandler()
{
- exchangeFactory = ApplicationRegistry.getInstance().getExchangeFactory();
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<ExchangeDeclareBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ExchangeDeclareBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ VirtualHost virtualHost = session.getVirtualHost();
+ ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
+ ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory();
+
final ExchangeDeclareBody body = evt.getMethod();
if (_logger.isDebugEnabled())
{
@@ -106,7 +109,7 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
AMQFrame response = ExchangeDeclareOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
- protocolSession.writeFrame(response);
+ session.writeFrame(response);
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
index 2faba57e04..fc2a4b1fd5 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
@@ -31,6 +31,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHost;
public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeDeleteBody>
{
@@ -45,10 +46,12 @@ public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeD
{
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<ExchangeDeleteBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ExchangeDeleteBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ VirtualHost virtualHost = session.getVirtualHost();
+ ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
+
ExchangeDeleteBody body = evt.getMethod();
try
{
@@ -57,7 +60,7 @@ public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeD
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
AMQFrame response = ExchangeDeleteOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
- protocolSession.writeFrame(response);
+ session.writeFrame(response);
}
catch (ExchangeInUseException e)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
index 45180d0cb6..0a23b9bd86 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
@@ -34,6 +34,7 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHost;
public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
{
@@ -50,15 +51,19 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
{
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<QueueBindBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<QueueBindBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ VirtualHost virtualHost = session.getVirtualHost();
+ ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+
+
final QueueBindBody body = evt.getMethod();
final AMQQueue queue;
if (body.queue == null)
{
- queue = protocolSession.getChannel(evt.getChannelId()).getDefaultQueue();
+ queue = session.getChannel(evt.getChannelId()).getDefaultQueue();
if (queue == null)
{
throw new AMQException("No default queue defined on channel and queue was null");
@@ -94,7 +99,7 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
final AMQFrame response = QueueBindOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
- protocolSession.writeFrame(response);
+ session.writeFrame(response);
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
index b62fe22b89..fdf98bb49e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
@@ -22,7 +22,6 @@ package org.apache.qpid.server.handler;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQChannelException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.configuration.Configured;
@@ -37,7 +36,7 @@ import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.configuration.Configurator;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import java.text.MessageFormat;
import java.util.concurrent.atomic.AtomicInteger;
@@ -58,18 +57,21 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
private final AtomicInteger _counter = new AtomicInteger();
- private final MessageStore _store;
+
protected QueueDeclareHandler()
{
Configurator.configure(this);
- _store = ApplicationRegistry.getInstance().getMessageStore();
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<QueueDeclareBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ VirtualHost virtualHost = session.getVirtualHost();
+ ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+ MessageStore store = virtualHost.getMessageStore();
+
QueueDeclareBody body = evt.getMethod();
// if we aren't given a queue name, we create one which we return to the client
@@ -94,10 +96,10 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
}
else
{
- queue = createQueue(body, queueRegistry, protocolSession);
+ queue = createQueue(body, virtualHost, session);
if (queue.isDurable() && !queue.isAutoDelete())
{
- _store.createQueue(queue);
+ store.createQueue(queue);
}
queueRegistry.registerQueue(queue);
if (autoRegister)
@@ -109,14 +111,14 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
}
}
}
- else if(queue.getOwner() != null && !protocolSession.getContextKey().equals(queue.getOwner()))
+ else if(queue.getOwner() != null && !session.getContextKey().equals(queue.getOwner()))
{
// todo - constant
throw body.getChannelException(405, "Cannot declare queue, as exclusive queue with same name declared on another connection");
}
//set this as the default queue on the channel:
- protocolSession.getChannel(evt.getChannelId()).setDefaultQueue(queue);
+ session.getChannel(evt.getChannelId()).setDefaultQueue(queue);
}
if (!body.nowait)
@@ -130,7 +132,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
queue.getMessageCount(), // messageCount
body.queue); // queue
_log.info("Queue " + body.queue + " declared successfully");
- protocolSession.writeFrame(response);
+ session.writeFrame(response);
}
}
@@ -144,10 +146,43 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
return MessageFormat.format("{0,number,0000000000000}", value);
}
- protected AMQQueue createQueue(QueueDeclareBody body, QueueRegistry registry, AMQProtocolSession session)
+ protected AMQQueue createQueue(QueueDeclareBody body, VirtualHost virtualHost, final AMQProtocolSession session)
throws AMQException
{
+ final QueueRegistry registry = virtualHost.getQueueRegistry();
AMQShortString owner = body.exclusive ? session.getContextKey() : null;
- return new AMQQueue(body.queue, body.durable, owner, body.autoDelete || (!body.durable && body.exclusive), registry);
+ final AMQQueue queue = new AMQQueue(body.queue, body.durable, owner, body.autoDelete, virtualHost);
+ final AMQShortString queueName = queue.getName();
+
+ if(body.exclusive && !body.durable)
+ {
+ final AMQProtocolSession.Task deleteQueueTask =
+ new AMQProtocolSession.Task()
+ {
+
+ public void doTask(AMQProtocolSession session) throws AMQException
+ {
+ if(registry.getQueue(queueName) == queue)
+ {
+ queue.delete();
+ }
+
+ }
+ };
+
+ session.addSessionCloseTask(deleteQueueTask);
+
+ queue.addQueueDeleteTask(new AMQQueue.Task()
+ {
+ public void doTask(AMQQueue queue)
+ {
+ session.removeSessionCloseTask(deleteQueueTask);
+ }
+ });
+
+
+ }
+
+ return queue;
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
index 245d86a7a6..3f0833de41 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
@@ -24,18 +24,14 @@ import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.framing.QueueDeleteBody;
import org.apache.qpid.framing.QueueDeleteOkBody;
-import org.apache.qpid.framing.ChannelCloseBody;
-import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQChannelException;
-import org.apache.qpid.protocol.AMQConstant;
public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteBody>
{
@@ -47,7 +43,6 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDelete
}
private final boolean _failIfNotFound;
- private final MessageStore _store;
public QueueDeleteHandler()
{
@@ -57,12 +52,16 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDelete
public QueueDeleteHandler(boolean failIfNotFound)
{
_failIfNotFound = failIfNotFound;
- _store = ApplicationRegistry.getInstance().getMessageStore();
}
- public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ VirtualHost virtualHost = session.getVirtualHost();
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+ MessageStore store = virtualHost.getMessageStore();
+
QueueDeleteBody body = evt.getMethod();
AMQQueue queue;
if(body.queue == null)
@@ -71,7 +70,7 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDelete
}
else
{
- queue = queues.getQueue(body.queue);
+ queue = queueRegistry.getQueue(body.queue);
}
if(queue == null)
@@ -96,7 +95,7 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDelete
else
{
int purged = queue.delete(body.ifUnused, body.ifEmpty);
- _store.removeQueue(queue.getName());
+ store.removeQueue(queue.getName());
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
index ad63d36351..c8341ee5b4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
@@ -9,6 +9,7 @@ import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
@@ -34,8 +35,12 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod
_failIfNotFound = failIfNotFound;
}
- public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueuePurgeBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<QueuePurgeBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ VirtualHost virtualHost = session.getVirtualHost();
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+
QueuePurgeBody body = evt.getMethod();
AMQQueue queue;
if(body.queue == null)
@@ -52,7 +57,7 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod
}
else
{
- queue = queues.getQueue(body.queue);
+ queue = queueRegistry.getQueue(body.queue);
}
if(queue == null)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
index 68b0c584eb..6f6b8b6f3c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
@@ -24,6 +24,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.TxCommitBody;
import org.apache.qpid.framing.TxCommitOkBody;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -47,10 +48,9 @@ public class TxCommitHandler implements StateAwareMethodListener<TxCommitBody>
{
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<TxCommitBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<TxCommitBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
try
{
@@ -58,13 +58,13 @@ public class TxCommitHandler implements StateAwareMethodListener<TxCommitBody>
{
_log.debug("Commit received on channel " + evt.getChannelId());
}
- AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
+ AMQChannel channel = session.getChannel(evt.getChannelId());
channel.commit();
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- protocolSession.writeFrame(TxCommitOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0));
- channel.processReturns(protocolSession);
+ session.writeFrame(TxCommitOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0));
+ channel.processReturns(session);
}
catch(AMQException e)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
index d71c93a6c6..31a28d2275 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
@@ -30,6 +30,7 @@ import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.virtualhost.VirtualHost;
public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBody>
{
@@ -44,20 +45,20 @@ public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBod
{
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<TxRollbackBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<TxRollbackBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
+
try{
- AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
+ AMQChannel channel = session.getChannel(evt.getChannelId());
channel.rollback();
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- protocolSession.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0));
+ session.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0));
//Now resend all the unacknowledged messages back to the original subscribers.
//(Must be done after the TxnRollback-ok response).
- channel.resend(protocolSession, false);
+ channel.resend(session, false);
}catch(AMQException e){
throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to rollback: " + e.getMessage());
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java
index 7d66fa9d5c..30b70869f9 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java
@@ -29,6 +29,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHost;
public class TxSelectHandler implements StateAwareMethodListener<TxSelectBody>
{
@@ -43,14 +44,14 @@ public class TxSelectHandler implements StateAwareMethodListener<TxSelectBody>
{
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<TxSelectBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<TxSelectBody> evt) throws AMQException
{
- protocolSession.getChannel(evt.getChannelId()).setLocalTransactional();
+ AMQProtocolSession session = stateManager.getProtocolSession();
+
+ session.getChannel(evt.getChannelId()).setLocalTransactional();
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- protocolSession.writeFrame(TxSelectOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0));
+ session.writeFrame(TxSelectOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0));
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java b/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java
index 311eb8add9..46bac52e78 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java
@@ -67,7 +67,7 @@ public abstract class DefaultManagedObject extends StandardMBean implements Mana
{
try
{
- ApplicationRegistry.getInstance().getManagedObjectRegistry().registerObject(this);
+ getManagedObjectRegistry().registerObject(this);
}
catch (JMException e)
{
@@ -75,11 +75,16 @@ public abstract class DefaultManagedObject extends StandardMBean implements Mana
}
}
+ protected ManagedObjectRegistry getManagedObjectRegistry()
+ {
+ return ApplicationRegistry.getInstance().getManagedObjectRegistry();
+ }
+
public void unregister() throws AMQException
{
try
{
- ApplicationRegistry.getInstance().getManagedObjectRegistry().unregisterObject(this);
+ getManagedObjectRegistry().unregisterObject(this);
}
catch (JMException e)
{
@@ -91,6 +96,7 @@ public abstract class DefaultManagedObject extends StandardMBean implements Mana
{
return getObjectInstanceName() + "[" + getType() + "]";
}
+
/**
* Created the ObjectName as per the JMX Specs
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index ed74263596..9ca11abb56 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -35,10 +35,10 @@ import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.codec.AMQDecoder;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.state.AMQStateManager;
@@ -51,6 +51,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CopyOnWriteArrayList;
public class AMQMinaProtocolSession implements AMQProtocolSession,
ProtocolVersionList,
@@ -65,16 +66,14 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
private AMQShortString _contextKey;
+ private VirtualHost _virtualHost;
+
private final Map<Integer, AMQChannel> _channelMap = new HashMap<Integer, AMQChannel>();
private final CopyOnWriteArraySet<AMQMethodListener> _frameListeners = new CopyOnWriteArraySet<AMQMethodListener>();
private final AMQStateManager _stateManager;
- private final QueueRegistry _queueRegistry;
-
- private final ExchangeRegistry _exchangeRegistry;
-
private AMQCodecFactory _codecFactory;
private AMQProtocolSessionMBean _managedObject;
@@ -93,6 +92,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
private byte _major;
private byte _minor;
private FieldTable _clientProperties;
+ private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
+
public ManagedObject getManagedObject()
{
@@ -100,23 +101,23 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
}
- public AMQMinaProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry,
+ public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry,
AMQCodecFactory codecFactory)
throws AMQException
{
- _stateManager = new AMQStateManager(queueRegistry, exchangeRegistry, this);
+ _stateManager = new AMQStateManager(virtualHostRegistry, this);
_minaProtocolSession = session;
session.setAttachment(this);
- _queueRegistry = queueRegistry;
- _exchangeRegistry = exchangeRegistry;
+
+
_codecFactory = codecFactory;
_managedObject = createMBean();
_managedObject.register();
// this(session, queueRegistry, exchangeRegistry, codecFactory, new AMQStateManager());
}
- public AMQMinaProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry,
+ public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry,
AMQCodecFactory codecFactory, AMQStateManager stateManager)
throws AMQException
{
@@ -124,8 +125,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
_minaProtocolSession = session;
session.setAttachment(this);
- _queueRegistry = queueRegistry;
- _exchangeRegistry = exchangeRegistry;
+
_codecFactory = codecFactory;
_managedObject = createMBean();
_managedObject.register();
@@ -461,6 +461,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
_managedObject.unregister();
}
+ for(Task task : _taskList)
+ {
+ task.doTask(this);
+ }
}
}
@@ -556,4 +560,27 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
return _minaProtocolSession.getRemoteAddress();
}
+
+
+ public VirtualHost getVirtualHost()
+ {
+ return _virtualHost;
+ }
+
+ public void setVirtualHost(VirtualHost virtualHost)
+ {
+ _virtualHost = virtualHost;
+ }
+
+ public void addSessionCloseTask(Task task)
+ {
+ _taskList.add(task);
+ }
+
+ public void removeSessionCloseTask(Task task)
+ {
+ _taskList.remove(task);
+ }
+
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
index 10e23caac3..474714680b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
@@ -53,41 +53,26 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter implements Protoco
{
private static final Logger _logger = Logger.getLogger(AMQPFastProtocolHandler.class);
- /**
- * The registry of all queues. This is passed to frame listeners when frame
- * events occur.
- */
- private final QueueRegistry _queueRegistry;
+ private final IApplicationRegistry _applicationRegistry;
- /**
- * The registry of all exchanges. This is passed to frame listeners when frame
- * events occur.
- */
- private final ExchangeRegistry _exchangeRegistry;
private boolean _useSSL;
public AMQPFastProtocolHandler(Integer applicationRegistryInstance)
{
- IApplicationRegistry registry = ApplicationRegistry.getInstance(applicationRegistryInstance);
-
- _queueRegistry = registry.getQueueRegistry();
- _exchangeRegistry = registry.getExchangeRegistry();
- _logger.debug("AMQPFastProtocolHandler created");
+ this(ApplicationRegistry.getInstance(applicationRegistryInstance));
}
- public AMQPFastProtocolHandler(QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry)
+ public AMQPFastProtocolHandler(IApplicationRegistry applicationRegistry)
{
- _queueRegistry = queueRegistry;
- _exchangeRegistry = exchangeRegistry;
+ _applicationRegistry = applicationRegistry;
_logger.debug("AMQPFastProtocolHandler created");
}
protected AMQPFastProtocolHandler(AMQPFastProtocolHandler handler)
{
- this(handler._queueRegistry, handler._exchangeRegistry);
+ this(handler._applicationRegistry);
}
public void sessionCreated(IoSession protocolSession) throws Exception
@@ -95,7 +80,7 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter implements Protoco
SessionUtil.initialize(protocolSession);
final AMQCodecFactory codecFactory = new AMQCodecFactory(true);
- createSession(protocolSession, _queueRegistry, _exchangeRegistry, codecFactory);
+ createSession(protocolSession, _applicationRegistry, codecFactory);
_logger.info("Protocol session created");
final ProtocolCodecFilter pcf = new ProtocolCodecFilter(codecFactory);
@@ -120,9 +105,9 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter implements Protoco
/**
* Separated into its own, protected, method to allow easier reuse
*/
- protected void createSession(IoSession session, QueueRegistry queues, ExchangeRegistry exchanges, AMQCodecFactory codec) throws AMQException
+ protected void createSession(IoSession session, IApplicationRegistry applicationRegistry, AMQCodecFactory codec) throws AMQException
{
- new AMQMinaProtocolSession(session, queues, exchanges, codec);
+ new AMQMinaProtocolSession(session, applicationRegistry.getVirtualHostRegistry(), codec);
}
public void sessionOpened(IoSession protocolSession) throws Exception
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPProtocolProvider.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPProtocolProvider.java
index ff1316f704..07c153bfe8 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPProtocolProvider.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPProtocolProvider.java
@@ -42,8 +42,7 @@ public class AMQPProtocolProvider
public AMQPProtocolProvider()
{
IApplicationRegistry registry = ApplicationRegistry.getInstance();
- _handler = new AMQPFastProtocolHandler(registry.getQueueRegistry(),
- registry.getExchangeRegistry());
+ _handler = new AMQPFastProtocolHandler(registry);
}
public AMQPFastProtocolHandler getHandler()
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
index a1249723ee..ee7e46eba4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
@@ -25,6 +25,7 @@ import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.protocol.AMQProtocolWriter;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.AMQException;
import javax.security.sasl.SaslServer;
@@ -32,6 +33,13 @@ import javax.security.sasl.SaslServer;
public interface AMQProtocolSession extends AMQProtocolWriter
{
+
+
+ public static interface Task
+ {
+ public void doTask(AMQProtocolSession session) throws AMQException;
+ }
+
/**
* Called when a protocol data block is received
* @param message the data block that has been received
@@ -126,4 +134,13 @@ public interface AMQProtocolSession extends AMQProtocolWriter
void setClientProperties(FieldTable clientProperties);
Object getClientIdentifier();
+
+ VirtualHost getVirtualHost();
+
+ void setVirtualHost(VirtualHost virtualHost);
+
+ void addSessionCloseTask(Task task);
+
+ void removeSessionCloseTask(Task task);
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
index df494915a3..b5fec39626 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
@@ -26,6 +26,7 @@ import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.MBeanConstructor;
import org.apache.qpid.server.management.MBeanDescription;
+import org.apache.qpid.server.management.ManagedObject;
import javax.management.JMException;
import javax.management.MBeanException;
@@ -93,6 +94,11 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed
return _session.getIOSession().getRemoteAddress().toString();
}
+ public ManagedObject getParentObject()
+ {
+ return _session.getVirtualHost().getManagedObject();
+ }
+
public Long getWrittenBytes()
{
return _session.getIOSession().getWrittenBytes();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 18b3adc635..709dd28ad5 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -30,11 +30,14 @@ import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import javax.management.JMException;
import java.text.MessageFormat;
import java.util.List;
+import java.util.ArrayList;
import java.util.concurrent.Executor;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -45,6 +48,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class AMQQueue implements Managable, Comparable
{
+
public static final class ExistingExclusiveSubscription extends AMQException
{
@@ -95,6 +99,12 @@ public class AMQQueue implements Managable, Comparable
private final AtomicBoolean _isExclusive = new AtomicBoolean();
+ private final AtomicBoolean _deleted = new AtomicBoolean(false);
+
+
+
+ private List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>();
+
/**
* Manages message delivery.
@@ -102,11 +112,6 @@ public class AMQQueue implements Managable, Comparable
private final DeliveryManager _deliveryMgr;
/**
- * The queue registry with which this queue is registered.
- */
- private final QueueRegistry _queueRegistry;
-
- /**
* Used to track bindings to exchanges so that on deletion they can easily
* be cancelled.
*/
@@ -119,6 +124,9 @@ public class AMQQueue implements Managable, Comparable
private final AMQQueueMBean _managedObject;
+ private final VirtualHost _virtualHost;
+
+
/**
* max allowed size(KB) of a single message
*/
@@ -145,59 +153,26 @@ public class AMQQueue implements Managable, Comparable
}
public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner,
- boolean autoDelete, QueueRegistry queueRegistry)
- throws AMQException
- {
- this(name, durable, owner, autoDelete, queueRegistry,
- AsyncDeliveryConfig.getAsyncDeliveryExecutor(), new SubscriptionImpl.Factory());
- }
-
- public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner,
- boolean autoDelete, QueueRegistry queueRegistry, SubscriptionFactory subscriptionFactory)
+ boolean autoDelete, VirtualHost virtualHost)
throws AMQException
{
- this(name, durable, owner, autoDelete, queueRegistry,
- AsyncDeliveryConfig.getAsyncDeliveryExecutor(), subscriptionFactory);
+ this(name, durable, owner, autoDelete, virtualHost,
+ AsyncDeliveryConfig.getAsyncDeliveryExecutor(), new SubscriptionSet(), new SubscriptionImpl.Factory());
}
- public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner,
- boolean autoDelete, QueueRegistry queueRegistry, Executor asyncDelivery,
- SubscriptionFactory subscriptionFactory)
- throws AMQException
- {
-
- this(name, durable, owner, autoDelete, queueRegistry, asyncDelivery, new SubscriptionSet(), subscriptionFactory);
- }
- public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner,
- boolean autoDelete, QueueRegistry queueRegistry, Executor asyncDelivery)
- throws AMQException
- {
-
- this(name, durable, owner, autoDelete, queueRegistry, asyncDelivery, new SubscriptionSet(),
- new SubscriptionImpl.Factory());
- }
-
- protected AMQQueue(AMQShortString name, boolean durable, AMQShortString owner,
- boolean autoDelete, QueueRegistry queueRegistry,
- SubscriptionSet subscribers, SubscriptionFactory subscriptionFactory)
- throws AMQException
- {
- this(name, durable, owner, autoDelete, queueRegistry,
- AsyncDeliveryConfig.getAsyncDeliveryExecutor(), subscribers, subscriptionFactory);
- }
protected AMQQueue(AMQShortString name, boolean durable, AMQShortString owner,
- boolean autoDelete, QueueRegistry queueRegistry,
+ boolean autoDelete, VirtualHost virtualHost,
SubscriptionSet subscribers)
throws AMQException
{
- this(name, durable, owner, autoDelete, queueRegistry,
+ this(name, durable, owner, autoDelete, virtualHost,
AsyncDeliveryConfig.getAsyncDeliveryExecutor(), subscribers, new SubscriptionImpl.Factory());
}
protected AMQQueue(AMQShortString name, boolean durable, AMQShortString owner,
- boolean autoDelete, QueueRegistry queueRegistry,
+ boolean autoDelete, VirtualHost virtualHost,
Executor asyncDelivery, SubscriptionSet subscribers, SubscriptionFactory subscriptionFactory)
throws AMQException
{
@@ -205,18 +180,20 @@ public class AMQQueue implements Managable, Comparable
{
throw new IllegalArgumentException("Queue name must not be null");
}
- if (queueRegistry == null)
+ if (virtualHost == null)
{
- throw new IllegalArgumentException("Queue registry must not be null");
+ throw new IllegalArgumentException("Virtual Host must not be null");
}
_name = name;
_durable = durable;
_owner = owner;
_autoDelete = autoDelete;
- _queueRegistry = queueRegistry;
+ _virtualHost = virtualHost;
_asyncDelivery = asyncDelivery;
+
_managedObject = createMBean();
_managedObject.register();
+
_subscribers = subscribers;
_subscriptionFactory = subscriptionFactory;
_deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
@@ -492,10 +469,18 @@ public class AMQQueue implements Managable, Comparable
public void delete() throws AMQException
{
- _subscribers.queueDeleted(this);
- _bindings.deregister();
- _queueRegistry.unregisterQueue(_name);
- _managedObject.unregister();
+ if(!_deleted.getAndSet(true))
+ {
+ _subscribers.queueDeleted(this);
+ _bindings.deregister();
+ _virtualHost.getQueueRegistry().unregisterQueue(_name);
+ _managedObject.unregister();
+ for(Task task : _deleteTaskList)
+ {
+ task.doTask(this);
+ }
+ _deleteTaskList.clear();
+ }
}
protected void autodelete() throws AMQException
@@ -620,6 +605,24 @@ public class AMQQueue implements Managable, Comparable
return _deliveryMgr.performGet(session, channel, acks);
}
-
+ public QueueRegistry getQueueRegistry()
+ {
+ return _virtualHost.getQueueRegistry();
+ }
+
+ public VirtualHost getVirtualHost()
+ {
+ return _virtualHost;
+ }
+
+ public static interface Task
+ {
+ public void doTask(AMQQueue queue) throws AMQException;
+ }
+
+ public void addQueueDeleteTask(Task task)
+ {
+ _deleteTaskList.add(task);
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
index 012b3600ca..ab67012b19 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
@@ -20,7 +20,9 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.server.management.MBeanDescription;
import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.MBeanConstructor;
+import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.Main;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -28,11 +30,7 @@ import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.mina.common.ByteBuffer;
import javax.management.openmbean.*;
-import javax.management.JMException;
-import javax.management.Notification;
-import javax.management.MBeanException;
-import javax.management.MBeanNotificationInfo;
-import javax.management.OperationsException;
+import javax.management.*;
import javax.management.monitor.MonitorNotification;
import java.util.List;
import java.util.ArrayList;
@@ -73,6 +71,12 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue
_queueName = jmxEncode(new StringBuffer(queue.getName()), 0).toString();
}
+
+ public ManagedObject getParentObject()
+ {
+ return _queue.getVirtualHost().getManagedObject();
+ }
+
static
{
try
@@ -373,6 +377,14 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue
return _messageList;
}
+//
+// public ObjectName getObjectName() throws MalformedObjectNameException
+// {
+// String objNameString = super.getObjectName().toString();
+//
+// return new ObjectName(objNameString);
+// }
+
/**
* returns Notifications sent by this MBean.
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
index 8ab26def74..084612ca41 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.framing.AMQShortString;
import java.util.concurrent.ConcurrentMap;
@@ -30,8 +31,16 @@ public class DefaultQueueRegistry implements QueueRegistry
{
private ConcurrentMap<AMQShortString, AMQQueue> _queueMap = new ConcurrentHashMap<AMQShortString, AMQQueue>();
- public DefaultQueueRegistry()
+ private final VirtualHost _virtualHost;
+
+ public DefaultQueueRegistry(VirtualHost virtualHost)
+ {
+ _virtualHost = virtualHost;
+ }
+
+ public VirtualHost getVirtualHost()
{
+ return _virtualHost;
}
public void registerQueue(AMQQueue queue) throws AMQException
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
index bfbaf27c84..c5f235f1b3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
@@ -21,11 +21,14 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.framing.AMQShortString;
public interface QueueRegistry
{
+ VirtualHost getVirtualHost();
+
void registerQueue(AMQQueue queue) throws AMQException;
void unregisterQueue(AMQShortString name) throws AMQException;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
index 48331843e5..0630d4f39f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.registry;
import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.Configurator;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.HashMap;
import java.util.Iterator;
@@ -38,7 +39,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
{
private static final Logger _logger = Logger.getLogger(ApplicationRegistry.class);
- private static Map _instanceMap = new HashMap();
+ private static Map<Integer, IApplicationRegistry> _instanceMap = new HashMap<Integer, IApplicationRegistry>();
private final Map<Class<?>, Object> _configuredObjects = new HashMap<Class<?>, Object>();
@@ -62,20 +63,13 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
{
synchronized (ApplicationRegistry.class)
{
- Iterator keyIterator = _instanceMap.keySet().iterator();
+ Iterator<IApplicationRegistry> keyIterator = _instanceMap.values().iterator();
while (keyIterator.hasNext())
{
- int key = (Integer) keyIterator.next();
- IApplicationRegistry instance = (IApplicationRegistry) _instanceMap.get(key);
-
- if ((instance != null))
- {
- if (instance.getMessageStore() != null)
- {
- instance.getMessageStore().close();
- }
- }
+ IApplicationRegistry instance = keyIterator.next();
+
+ instance.close();
}
}
}
@@ -118,7 +112,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
{
try
{
- ((IApplicationRegistry) _instanceMap.get(instanceID)).getMessageStore().close();
+ _instanceMap.get(instanceID).close();
}
catch (Exception e)
{
@@ -143,7 +137,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
public static IApplicationRegistry getInstance(int instanceID)
{
- IApplicationRegistry instance = (IApplicationRegistry) _instanceMap.get(instanceID);
+ IApplicationRegistry instance = _instanceMap.get(instanceID);
if (instance == null)
{
@@ -168,6 +162,14 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
}
}
+ public void close() throws Exception
+ {
+ for(VirtualHost virtualHost : getVirtualHostRegistry().getVirtualHosts())
+ {
+ virtualHost.close();
+ }
+ }
+
public Configuration getConfiguration()
{
return _configuration;
@@ -193,6 +195,8 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
return instance;
}
+
+
public static void setDefaultApplicationRegistry(String clazz)
{
_APPLICATION_REGISTRY = clazz;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
index 1eb490d6fb..790421b497 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
@@ -38,22 +38,26 @@ import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.security.auth.AuthenticationManager;
import org.apache.qpid.server.security.auth.SASLAuthenticationManager;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.mina.common.ByteBuffer;
import java.io.File;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
public class ConfigurationFileApplicationRegistry extends ApplicationRegistry
{
- private QueueRegistry _queueRegistry;
-
- private ExchangeRegistry _exchangeRegistry;
-
- private ExchangeFactory _exchangeFactory;
private ManagedObjectRegistry _managedObjectRegistry;
private AuthenticationManager _authenticationManager;
- private MessageStore _messageStore;
+ private VirtualHostRegistry _virtualHostRegistry;
+
+
+ private final Map<String, VirtualHost> _virtualHosts = new ConcurrentHashMap<String, VirtualHost>();
+
public ConfigurationFileApplicationRegistry(File configurationURL) throws ConfigurationException
{
@@ -91,11 +95,19 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry
public void initialise() throws Exception
{
initialiseManagedObjectRegistry();
- _queueRegistry = new DefaultQueueRegistry();
- _exchangeFactory = new DefaultExchangeFactory();
- _exchangeRegistry = new DefaultExchangeRegistry(_exchangeFactory);
+ _virtualHostRegistry = new VirtualHostRegistry();
_authenticationManager = new SASLAuthenticationManager();
- initialiseMessageStore();
+
+ initialiseVirtualHosts();
+ }
+
+ private void initialiseVirtualHosts() throws Exception
+ {
+ for(String name : getVirtualHostNames())
+ {
+
+ _virtualHostRegistry.registerVirtualHost(new VirtualHost(name,getConfiguration().subset("virtualhosts.virtualhost."+name)));
+ }
}
private void initialiseManagedObjectRegistry()
@@ -111,34 +123,10 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry
}
}
- private void initialiseMessageStore() throws Exception
- {
- String messageStoreClass = _configuration.getString("store.class");
- Class clazz = Class.forName(messageStoreClass);
- Object o = clazz.newInstance();
- if (!(o instanceof MessageStore))
- {
- throw new Exception("Message store class must implement " + MessageStore.class + ". Class " + clazz +
- " does not.");
- }
- _messageStore = (MessageStore) o;
- _messageStore.configure(getQueueRegistry(), "store", _configuration);
- }
-
- public QueueRegistry getQueueRegistry()
+ public VirtualHostRegistry getVirtualHostRegistry()
{
- return _queueRegistry;
- }
-
- public ExchangeRegistry getExchangeRegistry()
- {
- return _exchangeRegistry;
- }
-
- public ExchangeFactory getExchangeFactory()
- {
- return _exchangeFactory;
+ return _virtualHostRegistry;
}
public ManagedObjectRegistry getManagedObjectRegistry()
@@ -151,8 +139,8 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry
return _authenticationManager;
}
- public MessageStore getMessageStore()
+ public Collection<String> getVirtualHostNames()
{
- return _messageStore;
- }
+ return getConfiguration().getList("virtualhosts.virtualhost.name");
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
index cd664f9a4b..703aed69d2 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
@@ -26,8 +26,12 @@ import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.management.ManagedObjectRegistry;
import org.apache.qpid.server.security.auth.AuthenticationManager;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.commons.configuration.Configuration;
+import java.util.Collection;
+
public interface IApplicationRegistry
{
/**
@@ -35,7 +39,9 @@ public interface IApplicationRegistry
* that need access to the application registry itself for initialisation are able to use it. Attempting to
* initialise in the constructor will lead to failures since the registry reference will not have been set.
*/
- void initialise() throws Exception;
+ void initialise() throws Exception;
+
+ void close() throws Exception;
/**
* This gets access to a "configured object". A configured object has fields populated from a the configuration
@@ -54,15 +60,11 @@ public interface IApplicationRegistry
*/
Configuration getConfiguration();
- QueueRegistry getQueueRegistry();
-
- ExchangeRegistry getExchangeRegistry();
-
- ExchangeFactory getExchangeFactory();
-
ManagedObjectRegistry getManagedObjectRegistry();
AuthenticationManager getAuthenticationManager();
- MessageStore getMessageStore();
+ Collection<String> getVirtualHostNames();
+
+ VirtualHostRegistry getVirtualHostRegistry();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
index 81ce704026..7b8ba1d9cc 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
@@ -31,6 +31,7 @@ import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.log4j.Logger;
import java.util.HashMap;
@@ -46,8 +47,8 @@ import java.util.concurrent.CopyOnWriteArraySet;
public class AMQStateManager implements AMQMethodListener
{
private static final Logger _logger = Logger.getLogger(AMQStateManager.class);
- private final QueueRegistry _queueRegistry;
- private final ExchangeRegistry _exchangeRegistry;
+
+ private final VirtualHostRegistry _virtualHostRegistry;
private final AMQProtocolSession _protocolSession;
/**
* The current state
@@ -63,15 +64,15 @@ public class AMQStateManager implements AMQMethodListener
private CopyOnWriteArraySet<StateListener> _stateListeners = new CopyOnWriteArraySet<StateListener>();
- public AMQStateManager(QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession)
+
+ public AMQStateManager(VirtualHostRegistry virtualHostRegistry, AMQProtocolSession protocolSession)
{
- this(AMQState.CONNECTION_NOT_STARTED, true, queueRegistry, exchangeRegistry, protocolSession);
+ this(AMQState.CONNECTION_NOT_STARTED, true, virtualHostRegistry, protocolSession);
}
- protected AMQStateManager(AMQState initial, boolean register, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession)
+ protected AMQStateManager(AMQState initial, boolean register, VirtualHostRegistry virtualHostRegistry, AMQProtocolSession protocolSession)
{
- _queueRegistry = queueRegistry;
- _exchangeRegistry = exchangeRegistry;
+ _virtualHostRegistry = virtualHostRegistry;
_protocolSession = protocolSession;
_currentState = initial;
if (register)
@@ -176,7 +177,7 @@ public class AMQStateManager implements AMQMethodListener
checkChannel(evt, _protocolSession);
- handler.methodReceived(this, _queueRegistry, _exchangeRegistry, _protocolSession, evt);
+ handler.methodReceived(this, evt);
return true;
}
return false;
@@ -241,4 +242,14 @@ public class AMQStateManager implements AMQMethodListener
{
_stateListeners.remove(listener);
}
+
+ public VirtualHostRegistry getVirtualHostRegistry()
+ {
+ return _virtualHostRegistry;
+ }
+
+ public AMQProtocolSession getProtocolSession()
+ {
+ return _protocolSession;
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java b/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java
index 56323258b7..99d5d7fe88 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java
@@ -25,6 +25,7 @@ import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.qpid.framing.AMQMethodBody;
/**
@@ -34,7 +35,5 @@ import org.apache.qpid.framing.AMQMethodBody;
*/
public interface StateAwareMethodListener <B extends AMQMethodBody>
{
- void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<B> evt) throws AMQException;
+ void methodReceived(AMQStateManager stateManager, AMQMethodEvent<B> evt) throws AMQException;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index 98a4c3f6e7..eaaffa2dce 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -28,6 +28,7 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.ArrayList;
import java.util.List;
@@ -67,7 +68,7 @@ public class MemoryMessageStore implements MessageStore
_contentBodyMap = new ConcurrentHashMap<Long, List<ContentBody>>(hashtableCapacity);
}
- public void configure(QueueRegistry queueRegistry, String base, Configuration config) throws Exception
+ public void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception
{
configure(base, config);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
index c9c7045402..8daad0e5e5 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
@@ -27,6 +27,7 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.List;
@@ -35,13 +36,13 @@ public interface MessageStore
/**
* Called after instantiation in order to configure the message store. A particular implementation can define
* whatever parameters it wants.
- * @param queueRegistry the registry of queues to be used by this store
+ * @param virtualHost the virtual host using by this store
* @param base the base element identifier from which all configuration items are relative. For example, if the base
* element is "store", the all elements used by concrete classes will be "store.foo" etc.
* @param config the apache commons configuration object
* @throws Exception if an error occurs that means the store is unable to configure itself
*/
- void configure(QueueRegistry queueRegistry, String base, Configuration config) throws Exception;
+ void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception;
/**
* Called to close and cleanup any resources used by the message store.
diff --git a/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java
index 2e77f33363..e9a3a3d048 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java
@@ -33,24 +33,23 @@ import org.apache.qpid.server.security.auth.AuthenticationManager;
import org.apache.qpid.server.security.auth.NullAuthenticationManager;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.MapConfiguration;
import java.util.HashMap;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Arrays;
public class NullApplicationRegistry extends ApplicationRegistry
{
- private QueueRegistry _queueRegistry;
-
- private ExchangeRegistry _exchangeRegistry;
-
- private ExchangeFactory _exchangeFactory;
-
private ManagedObjectRegistry _managedObjectRegistry;
private AuthenticationManager _authenticationManager;
- private MessageStore _messageStore;
+ private VirtualHostRegistry _virtualHostRegistry;
public NullApplicationRegistry()
@@ -60,15 +59,16 @@ public class NullApplicationRegistry extends ApplicationRegistry
public void initialise() throws Exception
{
+ _configuration.addProperty("store.class","org.apache.qpid.server.store.MemoryMessageStore");
+
_managedObjectRegistry = new NoopManagedObjectRegistry();
- _queueRegistry = new DefaultQueueRegistry();
- _exchangeFactory = new DefaultExchangeFactory();
- _exchangeRegistry = new DefaultExchangeRegistry(_exchangeFactory);
+ _virtualHostRegistry = new VirtualHostRegistry();
+ VirtualHost dummyHost = new VirtualHost("test",getConfiguration());
+ _virtualHostRegistry.registerVirtualHost(dummyHost);
_authenticationManager = new NullAuthenticationManager();
- _messageStore = new MemoryMessageStore();
- ((MemoryMessageStore)_messageStore).configure();
_configuration.addProperty("heartbeat.delay", 10 * 60); // 10 minutes
+
}
public Configuration getConfiguration()
@@ -76,20 +76,6 @@ public class NullApplicationRegistry extends ApplicationRegistry
return _configuration;
}
- public QueueRegistry getQueueRegistry()
- {
- return _queueRegistry;
- }
-
- public ExchangeRegistry getExchangeRegistry()
- {
- return _exchangeRegistry;
- }
-
- public ExchangeFactory getExchangeFactory()
- {
- return _exchangeFactory;
- }
public ManagedObjectRegistry getManagedObjectRegistry()
{
@@ -101,9 +87,15 @@ public class NullApplicationRegistry extends ApplicationRegistry
return _authenticationManager;
}
- public MessageStore getMessageStore()
+ public Collection<String> getVirtualHostNames()
+ {
+ String[] hosts = {"test"};
+ return Arrays.asList( hosts );
+ }
+
+ public VirtualHostRegistry getVirtualHostRegistry()
{
- return _messageStore;
+ return _virtualHostRegistry;
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ManagedVirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ManagedVirtualHost.java
new file mode 100644
index 0000000000..2c888caac1
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ManagedVirtualHost.java
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import org.apache.qpid.server.management.MBeanAttribute;
+import org.apache.qpid.server.management.MBeanOperation;
+import org.apache.qpid.server.management.MBeanOperationParameter;
+import org.apache.qpid.server.queue.ManagedQueue;
+import org.apache.qpid.server.exchange.ManagedExchange;
+
+import javax.management.openmbean.TabularData;
+import javax.management.JMException;
+import javax.management.MBeanOperationInfo;
+import java.io.IOException;
+
+/**
+ * The management interface exposed to allow management of an Exchange.
+ * @version 0.1
+ */
+public interface ManagedVirtualHost
+{
+ static final String TYPE = "VirtualHost";
+
+ /**
+ * Returns the name of the managed virtualHost.
+ * @return the name of the exchange.
+ * @throws java.io.IOException
+ */
+ @MBeanAttribute(name="Name", description= TYPE + " Name")
+ String getName() throws IOException;
+
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
new file mode 100644
index 0000000000..15bad19e58
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -0,0 +1,193 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.DefaultQueueRegistry;
+import org.apache.qpid.server.exchange.*;
+import org.apache.qpid.server.management.*;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.configuration.Configurator;
+import org.apache.qpid.server.AMQBrokerManagerMBean;
+import org.apache.commons.configuration.Configuration;
+import org.apache.log4j.Logger;
+
+import javax.management.NotCompliantMBeanException;
+
+public class VirtualHost
+{
+ private static final Logger _logger = Logger.getLogger(VirtualHost.class);
+
+
+ private final String _name;
+
+ private QueueRegistry _queueRegistry;
+
+ private ExchangeRegistry _exchangeRegistry;
+
+ private ExchangeFactory _exchangeFactory;
+
+ private MessageStore _messageStore;
+
+ protected VirtualHostMBean _virtualHostMBean;
+
+ private AMQBrokerManagerMBean _brokerMBean;
+
+
+ /**
+ * Abstract MBean class. This has some of the methods implemented from
+ * management intrerface for exchanges. Any implementaion of an
+ * Exchange MBean should extend this class.
+ */
+ public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtualHost
+ {
+ public VirtualHostMBean() throws NotCompliantMBeanException
+ {
+ super(ManagedVirtualHost.class, "VirtualHost");
+ }
+
+
+ public String getObjectInstanceName()
+ {
+ return _name.toString();
+ }
+
+ public String getName()
+ {
+ return _name.toString();
+ }
+
+ public VirtualHost getVirtualHost()
+ {
+ return VirtualHost.this;
+ }
+
+
+ } // End of MBean class
+
+
+
+ public VirtualHost(String name, Configuration hostConfig) throws Exception
+ {
+ _name = name;
+
+ _virtualHostMBean = new VirtualHostMBean();
+ _virtualHostMBean.register();
+
+ _queueRegistry = new DefaultQueueRegistry(this);
+ _exchangeFactory = new DefaultExchangeFactory(this);
+ _exchangeRegistry = new DefaultExchangeRegistry(_exchangeFactory);
+
+ initialiseMessageStore(hostConfig);
+
+ _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
+ _brokerMBean.register();
+
+ }
+
+ private void initialiseMessageStore(Configuration config) throws Exception
+ {
+ String messageStoreClass = config.getString("store.class");
+
+ Class clazz = Class.forName(messageStoreClass);
+ Object o = clazz.newInstance();
+
+ if (!(o instanceof MessageStore))
+ {
+ throw new ClassCastException("Message store class must implement " + MessageStore.class + ". Class " + clazz +
+ " does not.");
+ }
+ _messageStore = (MessageStore) o;
+ _messageStore.configure(this, "store", config);
+ }
+
+
+
+
+ public <T> T getConfiguredObject(Class<T> instanceType, Configuration config)
+ {
+ T instance;
+ try
+ {
+ instance = instanceType.newInstance();
+ }
+ catch (Exception e)
+ {
+ _logger.error("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor");
+ throw new IllegalArgumentException("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor");
+ }
+ Configurator.configure(instance);
+
+ return instance;
+ }
+
+
+ public String getName()
+ {
+ return _name;
+ }
+
+ public QueueRegistry getQueueRegistry()
+ {
+ return _queueRegistry;
+ }
+
+ public ExchangeRegistry getExchangeRegistry()
+ {
+ return _exchangeRegistry;
+ }
+
+ public ExchangeFactory getExchangeFactory()
+ {
+ return _exchangeFactory;
+ }
+
+ public ApplicationRegistry getApplicationRegistry()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public MessageStore getMessageStore()
+ {
+ return _messageStore;
+ }
+
+ public void close() throws Exception
+ {
+ if(_messageStore != null)
+ {
+ _messageStore.close();
+ }
+ }
+
+ public ManagedObject getBrokerMBean()
+ {
+ return _brokerMBean;
+ }
+
+
+
+ public ManagedObject getManagedObject()
+ {
+ return _virtualHostMBean;
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java
new file mode 100644
index 0000000000..25f67c1cf3
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import java.util.Map;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.ArrayList;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+public class VirtualHostRegistry
+{
+ private final Map<String, VirtualHost> _registry = new ConcurrentHashMap<String,VirtualHost>();
+
+ public synchronized void registerVirtualHost(VirtualHost host) throws Exception
+ {
+ if(_registry.containsKey(host.getName()))
+ {
+ throw new Exception("Virtual Host with name " + host.getName() + " already registered.");
+ }
+ _registry.put(host.getName(),host);
+ }
+
+ public VirtualHost getVirtualHost(String name)
+ {
+ return _registry.get(name);
+ }
+
+ public Collection<VirtualHost> getVirtualHosts()
+ {
+ return new ArrayList<VirtualHost>(_registry.values());
+ }
+}