summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--java/broker/etc/config.xml35
-rw-r--r--java/broker/etc/virtualhosts.xml12
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java46
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/Main.java25
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/configuration/Configurator.java19
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java21
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java39
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java11
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java11
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java15
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java13
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java10
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java9
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java19
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java11
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java45
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java17
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java19
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java14
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java15
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java11
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java15
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java63
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java19
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java9
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java12
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java13
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java11
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java10
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java53
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java31
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPProtocolProvider.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java17
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java107
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java22
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java11
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java32
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java66
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java18
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java27
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java48
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/virtualhost/ManagedVirtualHost.java51
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java193
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java52
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java4
-rw-r--r--java/client/src/old_test/java/org/apache/qpid/test/unit/client/connection/TestManyConnections.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java14
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java4
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java4
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/ReceiveTest.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/SessionStartTest.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/AMQSessionTest.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java6
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java54
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Service.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/close/TopicPublisherCloseTest.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java4
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java8
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java4
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicPublisherTest.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java18
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java10
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java2
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java14
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java20
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java16
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java9
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java17
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java12
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java36
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java12
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java9
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java2
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java8
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java5
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java10
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java9
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java14
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java34
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java8
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java6
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java13
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java12
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java13
-rw-r--r--java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java6
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java7
-rw-r--r--java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/Constants.java22
-rw-r--r--java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ServerRegistry.java61
-rw-r--r--java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/ClientListener.java2
-rw-r--r--java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/ClientNotificationListener.java1
-rw-r--r--java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/JMXServerRegistry.java50
-rw-r--r--java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/MBeanTypeTabControl.java14
-rw-r--r--java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/MBeanView.java55
-rw-r--r--java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/NavigationView.java130
-rw-r--r--java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/NotificationsTabControl.java1
-rw-r--r--java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/OperationTabControl.java12
-rw-r--r--java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/ViewUtility.java3
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java2
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java13
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java2
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java16
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java3
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java3
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java14
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java10
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java5
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java9
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java21
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java6
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/util/TestApplicationRegistry.java25
143 files changed, 1540 insertions, 791 deletions
diff --git a/java/broker/etc/config.xml b/java/broker/etc/config.xml
index 61e0e55138..779a434332 100644
--- a/java/broker/etc/config.xml
+++ b/java/broker/etc/config.xml
@@ -79,6 +79,37 @@
</mechanisms>
</sasl>
</security>
+ <virtualhosts>
+ <virtualhost>
+ <name>localhost</name>
+ <localhost>
+ <store>
+ <!-- <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class> -->
+ <class>org.apache.qpid.server.store.MemoryMessageStore</class>
+ <environment-path>localhost-store</environment-path>
+ </store>
+ </localhost>
+ </virtualhost>
+
+ <virtualhost>
+ <name>development</name>
+ <development>
+ <store>
+ <class>org.apache.qpid.server.store.MemoryMessageStore</class>
+ </store>
+ </development>
+ </virtualhost>
+
+ <virtualhost>
+ <name>test</name>
+ <test>
+ <store>
+ <class>org.apache.qpid.server.store.MemoryMessageStore</class>
+ </store>
+ </test>
+ </virtualhost>
+
+ </virtualhosts>
<heartbeat>
<delay>0</delay>
<timeoutFactor>2.0</timeoutFactor>
@@ -86,8 +117,6 @@
<queue>
<auto_register>true</auto_register>
</queue>
- <store>
- <class>org.apache.qpid.server.store.MemoryMessageStore</class>
- </store>
+
<virtualhosts>${conf}/virtualhosts.xml</virtualhosts>
</broker>
diff --git a/java/broker/etc/virtualhosts.xml b/java/broker/etc/virtualhosts.xml
index de6a8c0682..50cddb5661 100644
--- a/java/broker/etc/virtualhosts.xml
+++ b/java/broker/etc/virtualhosts.xml
@@ -21,7 +21,17 @@
-->
<virtualhosts>
<virtualhost>
- <path>/development</path>
+ <path>localhost</path>
+ <bind>direct://amq.direct//queue</bind>
+ <bind>direct://amq.direct//ping</bind>
+ </virtualhost>
+ <virtualhost>
+ <path>development</path>
+ <bind>direct://amq.direct//queue</bind>
+ <bind>direct://amq.direct//ping</bind>
+ </virtualhost>
+ <virtualhost>
+ <path>test</path>
<bind>direct://amq.direct//queue</bind>
<bind>direct://amq.direct//ping</bind>
</virtualhost>
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
index 6f93a14469..2e6293081d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
@@ -30,6 +30,7 @@ import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
@@ -50,20 +51,28 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr
private final ExchangeFactory _exchangeFactory;
private final MessageStore _messageStore;
+ private final VirtualHost.VirtualHostMBean _virtualHostMBean;
+
@MBeanConstructor("Creates the Broker Manager MBean")
- public AMQBrokerManagerMBean() throws JMException
+ public AMQBrokerManagerMBean(VirtualHost.VirtualHostMBean virtualHostMBean) throws JMException
{
super(ManagedBroker.class, ManagedBroker.TYPE);
- IApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
- _queueRegistry = appRegistry.getQueueRegistry();
- _exchangeRegistry = appRegistry.getExchangeRegistry();
- _exchangeFactory = ApplicationRegistry.getInstance().getExchangeFactory();
- _messageStore = ApplicationRegistry.getInstance().getMessageStore();
+
+ _virtualHostMBean = virtualHostMBean;
+ VirtualHost virtualHost = virtualHostMBean.getVirtualHost();
+
+
+
+
+ _queueRegistry = virtualHost.getQueueRegistry();
+ _exchangeRegistry = virtualHost.getExchangeRegistry();
+ _messageStore = virtualHost.getMessageStore();
+ _exchangeFactory = virtualHost.getExchangeFactory();
}
public String getObjectInstanceName()
{
- return this.getClass().getName();
+ return _virtualHostMBean.getVirtualHost().getName();
}
/**
@@ -144,7 +153,7 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr
try
{
- queue = new AMQQueue(new AMQShortString(queueName), durable, new AMQShortString(owner), autoDelete, _queueRegistry);
+ queue = new AMQQueue(new AMQShortString(queueName), durable, new AMQShortString(owner), autoDelete, getVirtualHost());
if (queue.isDurable() && !queue.isAutoDelete())
{
_messageStore.createQueue(queue);
@@ -157,6 +166,11 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr
}
}
+ private VirtualHost getVirtualHost()
+ {
+ return _virtualHostMBean.getVirtualHost();
+ }
+
/**
* Deletes the queue from queue registry and persistant storage.
*
@@ -183,11 +197,17 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr
}
}
- public ObjectName getObjectName() throws MalformedObjectNameException
+ public ManagedObject getParentObject()
{
- StringBuffer objectName = new StringBuffer(ManagedObject.DOMAIN);
- objectName.append(":type=").append(getType());
-
- return new ObjectName(objectName.toString());
+ return _virtualHostMBean;
}
+
+// public ObjectName getObjectName() throws MalformedObjectNameException
+// {
+// StringBuffer objectName = new StringBuffer(ManagedObject.DOMAIN);
+// objectName.append(".").append(getVirtualHost().getName());
+// objectName.append(":type=").append(getType());
+//
+// return new ObjectName(objectName.toString());
+// }
} // End of MBean class
diff --git a/java/broker/src/main/java/org/apache/qpid/server/Main.java b/java/broker/src/main/java/org/apache/qpid/server/Main.java
index ffd25de0b4..c45d1ad2c2 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/Main.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/Main.java
@@ -62,7 +62,7 @@ import java.util.StringTokenizer;
* Main entry point for AMQPD.
*
*/
-public class Main implements ProtocolVersionList, Managable
+public class Main implements ProtocolVersionList
{
private static final Logger _logger = Logger.getLogger(Main.class);
@@ -70,7 +70,8 @@ public class Main implements ProtocolVersionList, Managable
private static final String DEFAULT_LOG_CONFIG_FILENAME = "log4j.xml";
- private AMQBrokerManagerMBean _mbean = null;
+
+ private static Main _instance;
protected static class InitException extends Exception
{
@@ -265,7 +266,6 @@ public class Main implements ProtocolVersionList, Managable
}
bind(port, connectorConfig);
- createAndRegisterBrokerMBean();
}
protected void setupVirtualHosts(String configFileParent, String configFilePath) throws ConfigurationException, AMQException, URLSyntaxException
@@ -368,7 +368,7 @@ public class Main implements ProtocolVersionList, Managable
public static void main(String[] args)
{
- new Main(args);
+ _instance = new Main(args);
}
private byte[] parseIP(String address) throws Exception
@@ -430,21 +430,4 @@ public class Main implements ProtocolVersionList, Managable
}
}
- private void createAndRegisterBrokerMBean() throws AMQException
- {
- try
- {
- _mbean = new AMQBrokerManagerMBean();
- _mbean.register();
- }
- catch (JMException ex)
- {
- throw new AMQException("Exception occured in creating AMQBrokerManager MBean");
- }
- }
-
- public ManagedObject getManagedObject()
- {
- return _mbean;
- }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/Configurator.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/Configurator.java
index 5e3ac03ba7..379da94aa3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/configuration/Configurator.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/Configurator.java
@@ -37,14 +37,15 @@ public class Configurator
{
private static final Logger _logger = Logger.getLogger(Configurator.class);
+
/**
- * Configure a given instance using the application configuration. Note that superclasses are <b>not</b>
+ * Configure a given instance using the supplied configuration. Note that superclasses are <b>not</b>
* currently configured but this could easily be added if required.
* @param instance the instance to configure
+ * @param config the configuration to use to configure the object
*/
- public static void configure(Object instance)
+ public static void configure(Object instance, Configuration config)
{
- final Configuration config = ApplicationRegistry.getInstance().getConfiguration();
for (Field f : instance.getClass().getDeclaredFields())
{
@@ -56,6 +57,18 @@ public class Configurator
}
}
+
+
+ /**
+ * Configure a given instance using the application configuration. Note that superclasses are <b>not</b>
+ * currently configured but this could easily be added if required.
+ * @param instance the instance to configure
+ */
+ public static void configure(Object instance)
+ {
+ configure(instance, ApplicationRegistry.getInstance().getConfiguration());
+ }
+
private static void setValueInField(Field f, Object instance, Configuration config, Configured annotation)
{
Class fieldClass = f.getType();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
index 7e807304c8..361a21b284 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
@@ -28,6 +28,7 @@ import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.log4j.Logger;
@@ -113,6 +114,7 @@ public class VirtualHostConfiguration
}
_logger.info("VirtualHost:'" + prop + "'");
+ String virtualHost = prop.toString();
prop = _config.getProperty(path + "." + XML_BIND);
if (prop instanceof Collection)
@@ -121,16 +123,16 @@ public class VirtualHostConfiguration
_logger.debug("Number of Bindings: " + bindings);
for (int dest = 0; dest < bindings; dest++)
{
- loadBinding(path, dest);
+ loadBinding(virtualHost, path, dest);
}
}
else
{
- loadBinding(path, -1);
+ loadBinding(virtualHost,path, -1);
}
}
- private void loadBinding(String rootpath, int index) throws AMQException, ConfigurationException, URLSyntaxException
+ private void loadBinding(String virtualHost, String rootpath, int index) throws AMQException, ConfigurationException, URLSyntaxException
{
String path = rootpath + "." + XML_BIND;
if (index != -1)
@@ -146,7 +148,7 @@ public class VirtualHostConfiguration
try
{
- bind(binding);
+ bind(virtualHost, binding);
}
catch (AMQException amqe)
{
@@ -155,7 +157,7 @@ public class VirtualHostConfiguration
}
}
- private void bind(AMQBindingURL binding) throws AMQException, ConfigurationException
+ private void bind(String virtualHostName, AMQBindingURL binding) throws AMQException, ConfigurationException
{
AMQShortString queueName = binding.getQueueName();
@@ -169,9 +171,10 @@ public class VirtualHostConfiguration
}
//Get references to Broker Registries
- QueueRegistry queueRegistry = ApplicationRegistry.getInstance().getQueueRegistry();
- MessageStore messageStore = ApplicationRegistry.getInstance().getMessageStore();
- ExchangeRegistry exchangeRegistry = ApplicationRegistry.getInstance().getExchangeRegistry();
+ VirtualHost virtualHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost(virtualHostName);
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+ MessageStore messageStore = virtualHost.getMessageStore();
+ ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
synchronized (queueRegistry)
{
@@ -184,7 +187,7 @@ public class VirtualHostConfiguration
queue = new AMQQueue(queueName,
Boolean.parseBoolean(binding.getOption(AMQBindingURL.OPTION_DURABLE)),
null /* These queues will have no owner */,
- false /* Therefore autodelete makes no sence */, queueRegistry);
+ false /* Therefore autodelete makes no sence */, virtualHost);
if (queue.isDurable())
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index 94c792c358..caafb83568 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -25,6 +25,10 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.management.ManagedObjectRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
import javax.management.MalformedObjectNameException;
import javax.management.NotCompliantMBeanException;
@@ -34,10 +38,14 @@ public abstract class AbstractExchange implements Exchange, Managable
{
private AMQShortString _name;
+
+
protected boolean _durable;
protected String _exchangeType;
protected int _ticket;
+ private VirtualHost _virtualHost;
+
protected ExchangeMBean _exchangeMbean;
/**
@@ -57,6 +65,11 @@ public abstract class AbstractExchange implements Exchange, Managable
super(ManagedExchange.class, ManagedExchange.TYPE);
}
+ public ManagedObject getParentObject()
+ {
+ return _virtualHost.getManagedObject();
+ }
+
public String getObjectInstanceName()
{
return _name.toString();
@@ -87,13 +100,17 @@ public abstract class AbstractExchange implements Exchange, Managable
return _autoDelete;
}
- public ObjectName getObjectName() throws MalformedObjectNameException
+// public ObjectName getObjectName() throws MalformedObjectNameException
+// {
+// String objNameString = super.getObjectName().toString();
+// objNameString = objNameString + ",VirtualHost="+ _virtualHost.getName() +",ExchangeType=" + _exchangeType;
+// return new ObjectName(objNameString);
+// }
+
+ protected ManagedObjectRegistry getManagedObjectRegistry()
{
- String objNameString = super.getObjectName().toString();
- objNameString = objNameString + ",ExchangeType=" + _exchangeType;
- return new ObjectName(objNameString);
+ return ApplicationRegistry.getInstance().getManagedObjectRegistry();
}
-
} // End of MBean class
public AMQShortString getName()
@@ -108,8 +125,9 @@ public abstract class AbstractExchange implements Exchange, Managable
*/
protected abstract ExchangeMBean createMBean() throws AMQException;
- public void initialise(AMQShortString name, boolean durable, int ticket, boolean autoDelete) throws AMQException
+ public void initialise(VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete) throws AMQException
{
+ _virtualHost = host;
_name = name;
_durable = durable;
_autoDelete = autoDelete;
@@ -151,4 +169,13 @@ public abstract class AbstractExchange implements Exchange, Managable
return _exchangeMbean;
}
+ public VirtualHost getVirtualHost()
+ {
+ return _virtualHost;
+ }
+
+ public QueueRegistry getQueueRegistry()
+ {
+ return getVirtualHost().getQueueRegistry();
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
index 77f9819048..d77f1b6c5a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
@@ -24,6 +24,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQUnknownExchangeType;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
@@ -36,9 +37,11 @@ public class DefaultExchangeFactory implements ExchangeFactory
private static final Logger _logger = Logger.getLogger(DefaultExchangeFactory.class);
private Map<AMQShortString, Class<? extends Exchange>> _exchangeClassMap = new HashMap<AMQShortString, Class<? extends Exchange>>();
+ private final VirtualHost _host;
- public DefaultExchangeFactory()
+ public DefaultExchangeFactory(VirtualHost host)
{
+ _host = host;
_exchangeClassMap.put(ExchangeDefaults.DIRECT_EXCHANGE_CLASS, org.apache.qpid.server.exchange.DestNameExchange.class);
_exchangeClassMap.put(ExchangeDefaults.TOPIC_EXCHANGE_CLASS, org.apache.qpid.server.exchange.DestWildExchange.class);
_exchangeClassMap.put(ExchangeDefaults.HEADERS_EXCHANGE_CLASS, org.apache.qpid.server.exchange.HeadersExchange.class);
@@ -59,7 +62,7 @@ public class DefaultExchangeFactory implements ExchangeFactory
try
{
Exchange e = exchClass.newInstance();
- e.initialise(exchange, durable, ticket, autoDelete);
+ e.initialise(_host, exchange, durable, ticket, autoDelete);
return e;
}
catch (InstantiationException e)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
index fcd6e8fdad..8862bd5104 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
@@ -109,7 +109,7 @@ public class DestNameExchange extends AbstractExchange
public void createNewBinding(String queueName, String binding) throws JMException
{
- AMQQueue queue = ApplicationRegistry.getInstance().getQueueRegistry().getQueue(new AMQShortString(queueName));
+ AMQQueue queue = getQueueRegistry().getQueue(new AMQShortString(queueName));
if (queue == null)
{
throw new JMException("Queue \"" + queueName + "\" is not registered with the exchange.");
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
index d1b35451b5..c38b9fe9b4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
@@ -31,6 +31,7 @@ import org.apache.qpid.server.management.MBeanDescription;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import javax.management.JMException;
import javax.management.MBeanException;
@@ -110,7 +111,7 @@ public class DestWildExchange extends AbstractExchange
public void createNewBinding(String queueName, String binding) throws JMException
{
- AMQQueue queue = ApplicationRegistry.getInstance().getQueueRegistry().getQueue(new AMQShortString(queueName));
+ AMQQueue queue = getQueueRegistry().getQueue(new AMQShortString(queueName));
if (queue == null)
throw new JMException("Queue \"" + queueName + "\" is not registered with the exchange.");
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
index 366dcb11b3..c012a1c1c9 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
@@ -25,13 +25,14 @@ import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.virtualhost.VirtualHost;
public interface Exchange
{
AMQShortString getName();
AMQShortString getType();
- void initialise(AMQShortString name, boolean durable, int ticket, boolean autoDelete) throws AMQException;
+ void initialise(VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete) throws AMQException;
boolean isDurable();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
index 2e7457e4a6..82039c345f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
@@ -44,7 +44,7 @@ public class FanoutExchange extends AbstractExchange
private TabularType _bindinglistDataType = null;
private TabularDataSupport _bindingList = null;
- @MBeanConstructor("Creates an MBean for AMQ direct exchange")
+ @MBeanConstructor("Creates an MBean for AMQ fanout exchange")
public FanoutExchangeMBean() throws JMException
{
super();
@@ -86,7 +86,7 @@ public class FanoutExchange extends AbstractExchange
public void createNewBinding(String queueName, String binding) throws JMException
{
- AMQQueue queue = ApplicationRegistry.getInstance().getQueueRegistry().getQueue(new AMQShortString(queueName));
+ AMQQueue queue = getQueueRegistry().getQueue(new AMQShortString(queueName));
if (queue == null)
{
throw new JMException("Queue \"" + queueName + "\" is not registered with the exchange.");
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
index 93933cd88d..3a49ff586b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
@@ -150,7 +150,7 @@ public class HeadersExchange extends AbstractExchange
*/
public void createNewBinding(String queueName, String binding) throws JMException
{
- AMQQueue queue = ApplicationRegistry.getInstance().getQueueRegistry().getQueue(new AMQShortString(queueName));
+ AMQQueue queue = getQueueRegistry().getQueue(new AMQShortString(queueName));
if (queue == null)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java
index e12dd4a9db..b2b1b0a716 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java
@@ -29,6 +29,7 @@ import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.log4j.Logger;
public class BasicAckMethodHandler implements StateAwareMethodListener<BasicAckBody>
@@ -46,10 +47,10 @@ public class BasicAckMethodHandler implements StateAwareMethodListener<BasicAckB
{
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<BasicAckBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicAckBody> evt) throws AMQException
{
+ AMQProtocolSession protocolSession = stateManager.getProtocolSession();
+
if (_log.isDebugEnabled())
{
_log.debug("Ack received on channel " + evt.getChannelId());
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java
index a23a29941f..2bbb696e90 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java
@@ -30,6 +30,7 @@ import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.qpid.AMQException;
public class BasicCancelMethodHandler implements StateAwareMethodListener<BasicCancelBody>
@@ -45,10 +46,10 @@ public class BasicCancelMethodHandler implements StateAwareMethodListener<BasicC
{
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<BasicCancelBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicCancelBody> evt) throws AMQException
{
+ AMQProtocolSession protocolSession = stateManager.getProtocolSession();
+
final AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
final BasicCancelBody body = evt.getMethod();
channel.unsubscribeConsumer(protocolSession, body.consumerTag);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
index 721001b454..bc695431c9 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
@@ -28,6 +28,8 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.framing.*;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.ConsumerTagNotUniqueException;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -53,14 +55,15 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
{
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession session,
- AMQMethodEvent<BasicConsumeBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
+
BasicConsumeBody body = evt.getMethod();
final int channelId = evt.getChannelId();
AMQChannel channel = session.getChannel(channelId);
+ VirtualHost vHost = session.getVirtualHost();
if (channel == null)
{
_log.error("Channel " + channelId + " not found");
@@ -69,7 +72,7 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
else
{
- AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : queueRegistry.getQueue(body.queue);
+ AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : vHost.getQueueRegistry().getQueue(body.queue);
if (queue == null)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
index 1eb3152973..51b585ecc5 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
@@ -10,6 +10,8 @@ import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.ConsumerTagNotUniqueException;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInvalidSelectorException;
import org.apache.qpid.protocol.AMQConstant;
@@ -31,12 +33,13 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
{
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession session,
- AMQMethodEvent<BasicGetBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicGetBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
+
BasicGetBody body = evt.getMethod();
final int channelId = evt.getChannelId();
+ VirtualHost vHost = session.getVirtualHost();
AMQChannel channel = session.getChannel(channelId);
if (channel == null)
@@ -46,7 +49,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
}
else
{
- AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : queueRegistry.getQueue(body.queue);
+ AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : vHost.getQueueRegistry().getQueue(body.queue);
if (queue == null)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
index 0ef30be265..78a246d80e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
@@ -28,6 +28,8 @@ import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ChannelCloseBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.protocol.AMQMethodEvent;
@@ -53,10 +55,10 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basi
{
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<BasicPublishBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicPublishBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
+
final BasicPublishBody body = evt.getMethod();
if (_log.isDebugEnabled())
@@ -70,7 +72,8 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basi
body.exchange = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
}
- Exchange e = exchangeRegistry.getExchange(body.exchange);
+ VirtualHost vHost = session.getVirtualHost();
+ Exchange e = vHost.getExchangeRegistry().getExchange(body.exchange);
// if the exchange does not exist we raise a channel exception
if (e == null)
{
@@ -82,8 +85,8 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basi
// The partially populated BasicDeliver frame plus the received route body
// is stored in the channel. Once the final body frame has been received
// it is routed to the exchange.
- AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
- channel.setPublishFrame(body, protocolSession);
+ AMQChannel channel = session.getChannel(evt.getChannelId());
+ channel.setPublishFrame(body, session);
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java
index 2bab4cac5c..325e5226ad 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java
@@ -28,6 +28,7 @@ import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.AMQException;
@@ -40,9 +41,9 @@ public class BasicQosHandler implements StateAwareMethodListener<BasicQosBody>
return _instance;
}
- public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges,
- AMQProtocolSession session, AMQMethodEvent<BasicQosBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicQosBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
session.getChannel(evt.getChannelId()).setPrefetchCount(evt.getMethod().prefetchCount);
session.getChannel(evt.getChannelId()).setPrefetchSize(evt.getMethod().prefetchSize);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
index f3e0cc3a63..a247ee33ce 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
@@ -27,6 +27,7 @@ import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.qpid.framing.BasicRecoverBody;
import org.apache.qpid.AMQException;
import org.apache.log4j.Logger;
@@ -42,18 +43,18 @@ public class BasicRecoverMethodHandler implements StateAwareMethodListener<Basic
return _instance;
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<BasicRecoverBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicRecoverBody> evt) throws AMQException
{
- _logger.debug("Recover received on protocol session " + protocolSession + " and channel " + evt.getChannelId());
- AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
+ AMQProtocolSession session = stateManager.getProtocolSession();
+
+ _logger.debug("Recover received on protocol session " + session + " and channel " + evt.getChannelId());
+ AMQChannel channel = session.getChannel(evt.getChannelId());
if (channel == null)
{
throw new AMQException("Unknown channel " + evt.getChannelId());
}
BasicRecoverBody body = evt.getMethod();
- channel.resend(protocolSession, body.requeue);
+ channel.resend(session, body.requeue);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
index a24ecd9b01..d46a4f6424 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
@@ -31,6 +31,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
public class ChannelCloseHandler implements StateAwareMethodListener<ChannelCloseBody>
{
@@ -47,18 +48,17 @@ public class ChannelCloseHandler implements StateAwareMethodListener<ChannelClos
{
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<ChannelCloseBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ChannelCloseBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
ChannelCloseBody body = evt.getMethod();
_logger.info("Received channel close for id " + evt.getChannelId() + " citing class " + body.classId +
" and method " + body.methodId);
- protocolSession.closeChannel(evt.getChannelId());
+ session.closeChannel(evt.getChannelId());
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
AMQFrame response = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
- protocolSession.writeFrame(response);
+ session.writeFrame(response);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java
index 81a5371829..be11d5e939 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java
@@ -28,6 +28,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.log4j.Logger;
public class ChannelCloseOkHandler implements StateAwareMethodListener<ChannelCloseOkBody>
@@ -45,9 +46,7 @@ public class ChannelCloseOkHandler implements StateAwareMethodListener<ChannelCl
{
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<ChannelCloseOkBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ChannelCloseOkBody> evt) throws AMQException
{
_logger.info("Received channel-close-ok for channel-id " + evt.getChannelId());
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
index d92a4eed6a..62f7ed4b78 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
@@ -48,13 +48,12 @@ public class ChannelFlowHandler implements StateAwareMethodListener<ChannelFlowB
{
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<ChannelFlowBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ChannelFlowBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
ChannelFlowBody body = evt.getMethod();
- AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
+ AMQChannel channel = session.getChannel(evt.getChannelId());
channel.setSuspended(!body.active);
_logger.debug("Channel.Flow for channel " + evt.getChannelId() + ", active=" + body.active);
@@ -64,6 +63,6 @@ public class ChannelFlowHandler implements StateAwareMethodListener<ChannelFlowB
AMQFrame response = ChannelFlowOkBody.createAMQFrame(evt.getChannelId(),
(byte)8, (byte)0, // AMQP version (major, minor)
body.active); // active
- protocolSession.writeFrame(response);
+ session.writeFrame(response);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
index 6ea9dfa595..5cd3f8ac89 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
@@ -25,6 +25,7 @@ import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.ChannelOpenBody;
import org.apache.qpid.framing.ChannelOpenOkBody;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.exchange.ExchangeRegistry;
@@ -47,18 +48,18 @@ public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenB
{
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<ChannelOpenBody> evt) throws AMQException
- {
- IApplicationRegistry registry = ApplicationRegistry.getInstance();
- final AMQChannel channel = new AMQChannel(evt.getChannelId(), registry.getMessageStore(),
- exchangeRegistry);
- protocolSession.addChannel(channel);
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ChannelOpenBody> evt) throws AMQException
+ {
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ VirtualHost virtualHost = session.getVirtualHost();
+
+ final AMQChannel channel = new AMQChannel(evt.getChannelId(), virtualHost.getMessageStore(),
+ virtualHost.getExchangeRegistry());
+ session.addChannel(channel);
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
AMQFrame response = ChannelOpenOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
- protocolSession.writeFrame(response);
+ session.writeFrame(response);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
index 0fe25a1c89..8bc849d5cb 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
@@ -47,16 +47,15 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener<C
{
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<ConnectionCloseBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ConnectionCloseBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
final ConnectionCloseBody body = evt.getMethod();
_logger.info("ConnectionClose received with reply code/reply text " + body.replyCode + "/" +
- body.replyText + " for " + protocolSession);
+ body.replyText + " for " + session);
try
{
- protocolSession.closeSession();
+ session.closeSession();
}
catch (Exception e)
{
@@ -66,6 +65,6 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener<C
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
final AMQFrame response = ConnectionCloseOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
- protocolSession.writeFrame(response);
+ session.writeFrame(response);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java
index bcdd86d2ef..c10a731cc5 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java
@@ -46,17 +46,16 @@ public class ConnectionCloseOkMethodHandler implements StateAwareMethodListener<
{
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<ConnectionCloseOkBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ConnectionCloseOkBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
//todo should this not do more than just log the method?
_logger.info("Received Connection-close-ok");
try
{
stateManager.changeState(AMQState.CONNECTION_CLOSED);
- protocolSession.closeSession();
+ session.closeSession();
}
catch (Exception e)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
index 8056ff9adb..88717c446b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
@@ -27,11 +27,13 @@ import org.apache.qpid.framing.ConnectionOpenOkBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQState;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHost;
public class ConnectionOpenMethodHandler implements StateAwareMethodListener<ConnectionOpenBody>
{
@@ -51,28 +53,39 @@ public class ConnectionOpenMethodHandler implements StateAwareMethodListener<Con
return new AMQShortString(Long.toString(System.currentTimeMillis()));
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<ConnectionOpenBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ConnectionOpenBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
ConnectionOpenBody body = evt.getMethod();
-
+ String virtualHostName = String.valueOf(body.virtualHost);
- //todo //FIXME The virtual host must be validated by the server for the connection to open-ok
- // See Spec (0.8.2). Section 3.1.2 Virtual Hosts
- if (protocolSession.getContextKey() == null)
+ VirtualHost virtualHost = stateManager.getVirtualHostRegistry().getVirtualHost(virtualHostName);
+
+ if(virtualHost == null)
{
- protocolSession.setContextKey(generateClientID());
+ throw body.getConnectionException(AMQConstant.NOT_FOUND.getCode(), "Unknown virtual host: " + virtualHostName);
}
+ else
+ {
+ session.setVirtualHost( virtualHost );
+
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- AMQFrame response = ConnectionOpenOkBody.createAMQFrame((short)0,
- (byte)8, (byte)0, // AMQP version (major, minor)
- body.virtualHost); // knownHosts
- stateManager.changeState(AMQState.CONNECTION_OPEN);
- protocolSession.writeFrame(response);
+
+ // See Spec (0.8.2). Section 3.1.2 Virtual Hosts
+ if (session.getContextKey() == null)
+ {
+ session.setContextKey(generateClientID());
+ }
+
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame response = ConnectionOpenOkBody.createAMQFrame((short)0,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ body.virtualHost);
+ stateManager.changeState(AMQState.CONNECTION_OPEN);
+ session.writeFrame(response);
+ }
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
index d33874b727..11cbaade30 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
@@ -54,14 +54,13 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener
{
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<ConnectionSecureOkBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ConnectionSecureOkBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
ConnectionSecureOkBody body = evt.getMethod();
AuthenticationManager authMgr = ApplicationRegistry.getInstance().getAuthenticationManager();
- SaslServer ss = protocolSession.getSaslServer();
+ SaslServer ss = session.getSaslServer();
if (ss == null)
{
throw new AMQException("No SASL context set up in session");
@@ -84,8 +83,8 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener
ConnectionCloseBody.getMethod((byte)8, (byte)0), // methodId
AMQConstant.NOT_ALLOWED.getCode(), // replyCode
AMQConstant.NOT_ALLOWED.getName()); // replyText
- protocolSession.writeFrame(close);
- disposeSaslServer(protocolSession);
+ session.writeFrame(close);
+ disposeSaslServer(session);
break;
case SUCCESS:
_logger.info("Connected as: " + ss.getAuthorizationID());
@@ -101,8 +100,8 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener
Integer.MAX_VALUE, // channelMax
ConnectionStartOkMethodHandler.getConfiguredFrameSize(), // frameMax
HeartbeatConfig.getInstance().getDelay()); // heartbeat
- protocolSession.writeFrame(tune);
- disposeSaslServer(protocolSession);
+ session.writeFrame(tune);
+ disposeSaslServer(session);
break;
case CONTINUE:
stateManager.changeState(AMQState.CONNECTION_NOT_AUTH);
@@ -112,7 +111,7 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener
AMQFrame challenge = ConnectionSecureBody.createAMQFrame(0,
(byte)8, (byte)0, // AMQP version (major, minor)
authResult.challenge); // challenge
- protocolSession.writeFrame(challenge);
+ session.writeFrame(challenge);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
index 6cb384f081..b45a017166 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
@@ -60,10 +60,9 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener<
{
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<ConnectionStartOkBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ConnectionStartOkBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
final ConnectionStartOkBody body = evt.getMethod();
_logger.info("SASL Mechanism selected: " + body.mechanism);
_logger.info("Locale selected: " + body.locale);
@@ -73,15 +72,15 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener<
SaslServer ss = null;
try
{
- ss = authMgr.createSaslServer(String.valueOf(body.mechanism), protocolSession.getLocalFQDN());
- protocolSession.setSaslServer(ss);
+ ss = authMgr.createSaslServer(String.valueOf(body.mechanism), session.getLocalFQDN());
+ session.setSaslServer(ss);
AuthenticationResult authResult = authMgr.authenticate(ss, body.response);
//save clientProperties
- if (protocolSession.getClientProperties() == null)
+ if (session.getClientProperties() == null)
{
- protocolSession.setClientProperties(body.clientProperties);
+ session.setClientProperties(body.clientProperties);
}
switch (authResult.status)
@@ -100,7 +99,7 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener<
Integer.MAX_VALUE, // channelMax
getConfiguredFrameSize(), // frameMax
HeartbeatConfig.getInstance().getDelay()); // heartbeat
- protocolSession.writeFrame(tune);
+ session.writeFrame(tune);
break;
case CONTINUE:
stateManager.changeState(AMQState.CONNECTION_NOT_AUTH);
@@ -110,12 +109,12 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener<
AMQFrame challenge = ConnectionSecureBody.createAMQFrame(0,
(byte)8, (byte)0, // AMQP version (major, minor)
authResult.challenge); // challenge
- protocolSession.writeFrame(challenge);
+ session.writeFrame(challenge);
}
}
catch (SaslException e)
{
- disposeSaslServer(protocolSession);
+ disposeSaslServer(session);
throw new AMQException("SASL error: " + e, e);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java
index 960643325a..020e93b7d2 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java
@@ -42,16 +42,15 @@ public class ConnectionTuneOkMethodHandler implements StateAwareMethodListener<C
return _instance;
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<ConnectionTuneOkBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ConnectionTuneOkBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
ConnectionTuneOkBody body = evt.getMethod();
if (_logger.isDebugEnabled())
{
_logger.debug(body);
}
stateManager.changeState(AMQState.CONNECTION_NOT_OPENED);
- protocolSession.initHeartbeats(body.heartbeat);
+ session.initHeartbeats(body.heartbeat);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
index 596e6bf332..30da1398b3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
@@ -30,6 +30,7 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHost;
/**
* @author Apache Software Foundation
@@ -61,10 +62,12 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo
{
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<ExchangeBoundBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ExchangeBoundBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ VirtualHost virtualHost = session.getVirtualHost();
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
byte major = (byte)8;
@@ -79,7 +82,7 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo
{
throw new AMQException("Exchange exchange must not be null");
}
- Exchange exchange = exchangeRegistry.getExchange(exchangeName);
+ Exchange exchange = virtualHost.getExchangeRegistry().getExchange(exchangeName);
AMQFrame response;
if (exchange == null)
{
@@ -112,6 +115,7 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo
}
else
{
+
AMQQueue queue = queueRegistry.getQueue(queueName);
if (queue == null)
{
@@ -194,6 +198,6 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo
" to exchange " + exchangeName)); // replyText
}
}
- protocolSession.writeFrame(response);
+ session.writeFrame(response);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
index 84e9a4e3f4..03af56ff14 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
@@ -38,6 +38,7 @@ import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
public class ExchangeDeclareHandler implements StateAwareMethodListener<ExchangeDeclareBody>
{
@@ -50,17 +51,19 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange
return _instance;
}
- private final ExchangeFactory exchangeFactory;
+
private ExchangeDeclareHandler()
{
- exchangeFactory = ApplicationRegistry.getInstance().getExchangeFactory();
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<ExchangeDeclareBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ExchangeDeclareBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ VirtualHost virtualHost = session.getVirtualHost();
+ ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
+ ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory();
+
final ExchangeDeclareBody body = evt.getMethod();
if (_logger.isDebugEnabled())
{
@@ -106,7 +109,7 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
AMQFrame response = ExchangeDeclareOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
- protocolSession.writeFrame(response);
+ session.writeFrame(response);
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
index 2faba57e04..fc2a4b1fd5 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
@@ -31,6 +31,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHost;
public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeDeleteBody>
{
@@ -45,10 +46,12 @@ public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeD
{
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<ExchangeDeleteBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ExchangeDeleteBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ VirtualHost virtualHost = session.getVirtualHost();
+ ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
+
ExchangeDeleteBody body = evt.getMethod();
try
{
@@ -57,7 +60,7 @@ public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeD
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
AMQFrame response = ExchangeDeleteOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
- protocolSession.writeFrame(response);
+ session.writeFrame(response);
}
catch (ExchangeInUseException e)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
index 45180d0cb6..0a23b9bd86 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
@@ -34,6 +34,7 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHost;
public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
{
@@ -50,15 +51,19 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
{
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<QueueBindBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<QueueBindBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ VirtualHost virtualHost = session.getVirtualHost();
+ ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+
+
final QueueBindBody body = evt.getMethod();
final AMQQueue queue;
if (body.queue == null)
{
- queue = protocolSession.getChannel(evt.getChannelId()).getDefaultQueue();
+ queue = session.getChannel(evt.getChannelId()).getDefaultQueue();
if (queue == null)
{
throw new AMQException("No default queue defined on channel and queue was null");
@@ -94,7 +99,7 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
final AMQFrame response = QueueBindOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
- protocolSession.writeFrame(response);
+ session.writeFrame(response);
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
index b62fe22b89..fdf98bb49e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
@@ -22,7 +22,6 @@ package org.apache.qpid.server.handler;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQChannelException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.configuration.Configured;
@@ -37,7 +36,7 @@ import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.configuration.Configurator;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import java.text.MessageFormat;
import java.util.concurrent.atomic.AtomicInteger;
@@ -58,18 +57,21 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
private final AtomicInteger _counter = new AtomicInteger();
- private final MessageStore _store;
+
protected QueueDeclareHandler()
{
Configurator.configure(this);
- _store = ApplicationRegistry.getInstance().getMessageStore();
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<QueueDeclareBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ VirtualHost virtualHost = session.getVirtualHost();
+ ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+ MessageStore store = virtualHost.getMessageStore();
+
QueueDeclareBody body = evt.getMethod();
// if we aren't given a queue name, we create one which we return to the client
@@ -94,10 +96,10 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
}
else
{
- queue = createQueue(body, queueRegistry, protocolSession);
+ queue = createQueue(body, virtualHost, session);
if (queue.isDurable() && !queue.isAutoDelete())
{
- _store.createQueue(queue);
+ store.createQueue(queue);
}
queueRegistry.registerQueue(queue);
if (autoRegister)
@@ -109,14 +111,14 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
}
}
}
- else if(queue.getOwner() != null && !protocolSession.getContextKey().equals(queue.getOwner()))
+ else if(queue.getOwner() != null && !session.getContextKey().equals(queue.getOwner()))
{
// todo - constant
throw body.getChannelException(405, "Cannot declare queue, as exclusive queue with same name declared on another connection");
}
//set this as the default queue on the channel:
- protocolSession.getChannel(evt.getChannelId()).setDefaultQueue(queue);
+ session.getChannel(evt.getChannelId()).setDefaultQueue(queue);
}
if (!body.nowait)
@@ -130,7 +132,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
queue.getMessageCount(), // messageCount
body.queue); // queue
_log.info("Queue " + body.queue + " declared successfully");
- protocolSession.writeFrame(response);
+ session.writeFrame(response);
}
}
@@ -144,10 +146,43 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
return MessageFormat.format("{0,number,0000000000000}", value);
}
- protected AMQQueue createQueue(QueueDeclareBody body, QueueRegistry registry, AMQProtocolSession session)
+ protected AMQQueue createQueue(QueueDeclareBody body, VirtualHost virtualHost, final AMQProtocolSession session)
throws AMQException
{
+ final QueueRegistry registry = virtualHost.getQueueRegistry();
AMQShortString owner = body.exclusive ? session.getContextKey() : null;
- return new AMQQueue(body.queue, body.durable, owner, body.autoDelete || (!body.durable && body.exclusive), registry);
+ final AMQQueue queue = new AMQQueue(body.queue, body.durable, owner, body.autoDelete, virtualHost);
+ final AMQShortString queueName = queue.getName();
+
+ if(body.exclusive && !body.durable)
+ {
+ final AMQProtocolSession.Task deleteQueueTask =
+ new AMQProtocolSession.Task()
+ {
+
+ public void doTask(AMQProtocolSession session) throws AMQException
+ {
+ if(registry.getQueue(queueName) == queue)
+ {
+ queue.delete();
+ }
+
+ }
+ };
+
+ session.addSessionCloseTask(deleteQueueTask);
+
+ queue.addQueueDeleteTask(new AMQQueue.Task()
+ {
+ public void doTask(AMQQueue queue)
+ {
+ session.removeSessionCloseTask(deleteQueueTask);
+ }
+ });
+
+
+ }
+
+ return queue;
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
index 245d86a7a6..3f0833de41 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
@@ -24,18 +24,14 @@ import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.framing.QueueDeleteBody;
import org.apache.qpid.framing.QueueDeleteOkBody;
-import org.apache.qpid.framing.ChannelCloseBody;
-import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQChannelException;
-import org.apache.qpid.protocol.AMQConstant;
public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteBody>
{
@@ -47,7 +43,6 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDelete
}
private final boolean _failIfNotFound;
- private final MessageStore _store;
public QueueDeleteHandler()
{
@@ -57,12 +52,16 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDelete
public QueueDeleteHandler(boolean failIfNotFound)
{
_failIfNotFound = failIfNotFound;
- _store = ApplicationRegistry.getInstance().getMessageStore();
}
- public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ VirtualHost virtualHost = session.getVirtualHost();
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+ MessageStore store = virtualHost.getMessageStore();
+
QueueDeleteBody body = evt.getMethod();
AMQQueue queue;
if(body.queue == null)
@@ -71,7 +70,7 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDelete
}
else
{
- queue = queues.getQueue(body.queue);
+ queue = queueRegistry.getQueue(body.queue);
}
if(queue == null)
@@ -96,7 +95,7 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDelete
else
{
int purged = queue.delete(body.ifUnused, body.ifEmpty);
- _store.removeQueue(queue.getName());
+ store.removeQueue(queue.getName());
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
index ad63d36351..c8341ee5b4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
@@ -9,6 +9,7 @@ import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
@@ -34,8 +35,12 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod
_failIfNotFound = failIfNotFound;
}
- public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueuePurgeBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<QueuePurgeBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ VirtualHost virtualHost = session.getVirtualHost();
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+
QueuePurgeBody body = evt.getMethod();
AMQQueue queue;
if(body.queue == null)
@@ -52,7 +57,7 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod
}
else
{
- queue = queues.getQueue(body.queue);
+ queue = queueRegistry.getQueue(body.queue);
}
if(queue == null)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
index 68b0c584eb..6f6b8b6f3c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
@@ -24,6 +24,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.TxCommitBody;
import org.apache.qpid.framing.TxCommitOkBody;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -47,10 +48,9 @@ public class TxCommitHandler implements StateAwareMethodListener<TxCommitBody>
{
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<TxCommitBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<TxCommitBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
try
{
@@ -58,13 +58,13 @@ public class TxCommitHandler implements StateAwareMethodListener<TxCommitBody>
{
_log.debug("Commit received on channel " + evt.getChannelId());
}
- AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
+ AMQChannel channel = session.getChannel(evt.getChannelId());
channel.commit();
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- protocolSession.writeFrame(TxCommitOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0));
- channel.processReturns(protocolSession);
+ session.writeFrame(TxCommitOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0));
+ channel.processReturns(session);
}
catch(AMQException e)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
index d71c93a6c6..31a28d2275 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
@@ -30,6 +30,7 @@ import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.virtualhost.VirtualHost;
public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBody>
{
@@ -44,20 +45,20 @@ public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBod
{
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<TxRollbackBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<TxRollbackBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
+
try{
- AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
+ AMQChannel channel = session.getChannel(evt.getChannelId());
channel.rollback();
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- protocolSession.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0));
+ session.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0));
//Now resend all the unacknowledged messages back to the original subscribers.
//(Must be done after the TxnRollback-ok response).
- channel.resend(protocolSession, false);
+ channel.resend(session, false);
}catch(AMQException e){
throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to rollback: " + e.getMessage());
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java
index 7d66fa9d5c..30b70869f9 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java
@@ -29,6 +29,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHost;
public class TxSelectHandler implements StateAwareMethodListener<TxSelectBody>
{
@@ -43,14 +44,14 @@ public class TxSelectHandler implements StateAwareMethodListener<TxSelectBody>
{
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<TxSelectBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<TxSelectBody> evt) throws AMQException
{
- protocolSession.getChannel(evt.getChannelId()).setLocalTransactional();
+ AMQProtocolSession session = stateManager.getProtocolSession();
+
+ session.getChannel(evt.getChannelId()).setLocalTransactional();
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- protocolSession.writeFrame(TxSelectOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0));
+ session.writeFrame(TxSelectOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0));
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java b/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java
index 311eb8add9..46bac52e78 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java
@@ -67,7 +67,7 @@ public abstract class DefaultManagedObject extends StandardMBean implements Mana
{
try
{
- ApplicationRegistry.getInstance().getManagedObjectRegistry().registerObject(this);
+ getManagedObjectRegistry().registerObject(this);
}
catch (JMException e)
{
@@ -75,11 +75,16 @@ public abstract class DefaultManagedObject extends StandardMBean implements Mana
}
}
+ protected ManagedObjectRegistry getManagedObjectRegistry()
+ {
+ return ApplicationRegistry.getInstance().getManagedObjectRegistry();
+ }
+
public void unregister() throws AMQException
{
try
{
- ApplicationRegistry.getInstance().getManagedObjectRegistry().unregisterObject(this);
+ getManagedObjectRegistry().unregisterObject(this);
}
catch (JMException e)
{
@@ -91,6 +96,7 @@ public abstract class DefaultManagedObject extends StandardMBean implements Mana
{
return getObjectInstanceName() + "[" + getType() + "]";
}
+
/**
* Created the ObjectName as per the JMX Specs
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index ed74263596..9ca11abb56 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -35,10 +35,10 @@ import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.codec.AMQDecoder;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.state.AMQStateManager;
@@ -51,6 +51,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CopyOnWriteArrayList;
public class AMQMinaProtocolSession implements AMQProtocolSession,
ProtocolVersionList,
@@ -65,16 +66,14 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
private AMQShortString _contextKey;
+ private VirtualHost _virtualHost;
+
private final Map<Integer, AMQChannel> _channelMap = new HashMap<Integer, AMQChannel>();
private final CopyOnWriteArraySet<AMQMethodListener> _frameListeners = new CopyOnWriteArraySet<AMQMethodListener>();
private final AMQStateManager _stateManager;
- private final QueueRegistry _queueRegistry;
-
- private final ExchangeRegistry _exchangeRegistry;
-
private AMQCodecFactory _codecFactory;
private AMQProtocolSessionMBean _managedObject;
@@ -93,6 +92,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
private byte _major;
private byte _minor;
private FieldTable _clientProperties;
+ private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
+
public ManagedObject getManagedObject()
{
@@ -100,23 +101,23 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
}
- public AMQMinaProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry,
+ public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry,
AMQCodecFactory codecFactory)
throws AMQException
{
- _stateManager = new AMQStateManager(queueRegistry, exchangeRegistry, this);
+ _stateManager = new AMQStateManager(virtualHostRegistry, this);
_minaProtocolSession = session;
session.setAttachment(this);
- _queueRegistry = queueRegistry;
- _exchangeRegistry = exchangeRegistry;
+
+
_codecFactory = codecFactory;
_managedObject = createMBean();
_managedObject.register();
// this(session, queueRegistry, exchangeRegistry, codecFactory, new AMQStateManager());
}
- public AMQMinaProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry,
+ public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry,
AMQCodecFactory codecFactory, AMQStateManager stateManager)
throws AMQException
{
@@ -124,8 +125,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
_minaProtocolSession = session;
session.setAttachment(this);
- _queueRegistry = queueRegistry;
- _exchangeRegistry = exchangeRegistry;
+
_codecFactory = codecFactory;
_managedObject = createMBean();
_managedObject.register();
@@ -461,6 +461,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
_managedObject.unregister();
}
+ for(Task task : _taskList)
+ {
+ task.doTask(this);
+ }
}
}
@@ -556,4 +560,27 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
return _minaProtocolSession.getRemoteAddress();
}
+
+
+ public VirtualHost getVirtualHost()
+ {
+ return _virtualHost;
+ }
+
+ public void setVirtualHost(VirtualHost virtualHost)
+ {
+ _virtualHost = virtualHost;
+ }
+
+ public void addSessionCloseTask(Task task)
+ {
+ _taskList.add(task);
+ }
+
+ public void removeSessionCloseTask(Task task)
+ {
+ _taskList.remove(task);
+ }
+
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
index 10e23caac3..474714680b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
@@ -53,41 +53,26 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter implements Protoco
{
private static final Logger _logger = Logger.getLogger(AMQPFastProtocolHandler.class);
- /**
- * The registry of all queues. This is passed to frame listeners when frame
- * events occur.
- */
- private final QueueRegistry _queueRegistry;
+ private final IApplicationRegistry _applicationRegistry;
- /**
- * The registry of all exchanges. This is passed to frame listeners when frame
- * events occur.
- */
- private final ExchangeRegistry _exchangeRegistry;
private boolean _useSSL;
public AMQPFastProtocolHandler(Integer applicationRegistryInstance)
{
- IApplicationRegistry registry = ApplicationRegistry.getInstance(applicationRegistryInstance);
-
- _queueRegistry = registry.getQueueRegistry();
- _exchangeRegistry = registry.getExchangeRegistry();
- _logger.debug("AMQPFastProtocolHandler created");
+ this(ApplicationRegistry.getInstance(applicationRegistryInstance));
}
- public AMQPFastProtocolHandler(QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry)
+ public AMQPFastProtocolHandler(IApplicationRegistry applicationRegistry)
{
- _queueRegistry = queueRegistry;
- _exchangeRegistry = exchangeRegistry;
+ _applicationRegistry = applicationRegistry;
_logger.debug("AMQPFastProtocolHandler created");
}
protected AMQPFastProtocolHandler(AMQPFastProtocolHandler handler)
{
- this(handler._queueRegistry, handler._exchangeRegistry);
+ this(handler._applicationRegistry);
}
public void sessionCreated(IoSession protocolSession) throws Exception
@@ -95,7 +80,7 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter implements Protoco
SessionUtil.initialize(protocolSession);
final AMQCodecFactory codecFactory = new AMQCodecFactory(true);
- createSession(protocolSession, _queueRegistry, _exchangeRegistry, codecFactory);
+ createSession(protocolSession, _applicationRegistry, codecFactory);
_logger.info("Protocol session created");
final ProtocolCodecFilter pcf = new ProtocolCodecFilter(codecFactory);
@@ -120,9 +105,9 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter implements Protoco
/**
* Separated into its own, protected, method to allow easier reuse
*/
- protected void createSession(IoSession session, QueueRegistry queues, ExchangeRegistry exchanges, AMQCodecFactory codec) throws AMQException
+ protected void createSession(IoSession session, IApplicationRegistry applicationRegistry, AMQCodecFactory codec) throws AMQException
{
- new AMQMinaProtocolSession(session, queues, exchanges, codec);
+ new AMQMinaProtocolSession(session, applicationRegistry.getVirtualHostRegistry(), codec);
}
public void sessionOpened(IoSession protocolSession) throws Exception
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPProtocolProvider.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPProtocolProvider.java
index ff1316f704..07c153bfe8 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPProtocolProvider.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPProtocolProvider.java
@@ -42,8 +42,7 @@ public class AMQPProtocolProvider
public AMQPProtocolProvider()
{
IApplicationRegistry registry = ApplicationRegistry.getInstance();
- _handler = new AMQPFastProtocolHandler(registry.getQueueRegistry(),
- registry.getExchangeRegistry());
+ _handler = new AMQPFastProtocolHandler(registry);
}
public AMQPFastProtocolHandler getHandler()
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
index a1249723ee..ee7e46eba4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
@@ -25,6 +25,7 @@ import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.protocol.AMQProtocolWriter;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.AMQException;
import javax.security.sasl.SaslServer;
@@ -32,6 +33,13 @@ import javax.security.sasl.SaslServer;
public interface AMQProtocolSession extends AMQProtocolWriter
{
+
+
+ public static interface Task
+ {
+ public void doTask(AMQProtocolSession session) throws AMQException;
+ }
+
/**
* Called when a protocol data block is received
* @param message the data block that has been received
@@ -126,4 +134,13 @@ public interface AMQProtocolSession extends AMQProtocolWriter
void setClientProperties(FieldTable clientProperties);
Object getClientIdentifier();
+
+ VirtualHost getVirtualHost();
+
+ void setVirtualHost(VirtualHost virtualHost);
+
+ void addSessionCloseTask(Task task);
+
+ void removeSessionCloseTask(Task task);
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
index df494915a3..b5fec39626 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
@@ -26,6 +26,7 @@ import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.MBeanConstructor;
import org.apache.qpid.server.management.MBeanDescription;
+import org.apache.qpid.server.management.ManagedObject;
import javax.management.JMException;
import javax.management.MBeanException;
@@ -93,6 +94,11 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed
return _session.getIOSession().getRemoteAddress().toString();
}
+ public ManagedObject getParentObject()
+ {
+ return _session.getVirtualHost().getManagedObject();
+ }
+
public Long getWrittenBytes()
{
return _session.getIOSession().getWrittenBytes();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 18b3adc635..709dd28ad5 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -30,11 +30,14 @@ import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import javax.management.JMException;
import java.text.MessageFormat;
import java.util.List;
+import java.util.ArrayList;
import java.util.concurrent.Executor;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -45,6 +48,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class AMQQueue implements Managable, Comparable
{
+
public static final class ExistingExclusiveSubscription extends AMQException
{
@@ -95,6 +99,12 @@ public class AMQQueue implements Managable, Comparable
private final AtomicBoolean _isExclusive = new AtomicBoolean();
+ private final AtomicBoolean _deleted = new AtomicBoolean(false);
+
+
+
+ private List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>();
+
/**
* Manages message delivery.
@@ -102,11 +112,6 @@ public class AMQQueue implements Managable, Comparable
private final DeliveryManager _deliveryMgr;
/**
- * The queue registry with which this queue is registered.
- */
- private final QueueRegistry _queueRegistry;
-
- /**
* Used to track bindings to exchanges so that on deletion they can easily
* be cancelled.
*/
@@ -119,6 +124,9 @@ public class AMQQueue implements Managable, Comparable
private final AMQQueueMBean _managedObject;
+ private final VirtualHost _virtualHost;
+
+
/**
* max allowed size(KB) of a single message
*/
@@ -145,59 +153,26 @@ public class AMQQueue implements Managable, Comparable
}
public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner,
- boolean autoDelete, QueueRegistry queueRegistry)
- throws AMQException
- {
- this(name, durable, owner, autoDelete, queueRegistry,
- AsyncDeliveryConfig.getAsyncDeliveryExecutor(), new SubscriptionImpl.Factory());
- }
-
- public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner,
- boolean autoDelete, QueueRegistry queueRegistry, SubscriptionFactory subscriptionFactory)
+ boolean autoDelete, VirtualHost virtualHost)
throws AMQException
{
- this(name, durable, owner, autoDelete, queueRegistry,
- AsyncDeliveryConfig.getAsyncDeliveryExecutor(), subscriptionFactory);
+ this(name, durable, owner, autoDelete, virtualHost,
+ AsyncDeliveryConfig.getAsyncDeliveryExecutor(), new SubscriptionSet(), new SubscriptionImpl.Factory());
}
- public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner,
- boolean autoDelete, QueueRegistry queueRegistry, Executor asyncDelivery,
- SubscriptionFactory subscriptionFactory)
- throws AMQException
- {
-
- this(name, durable, owner, autoDelete, queueRegistry, asyncDelivery, new SubscriptionSet(), subscriptionFactory);
- }
- public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner,
- boolean autoDelete, QueueRegistry queueRegistry, Executor asyncDelivery)
- throws AMQException
- {
-
- this(name, durable, owner, autoDelete, queueRegistry, asyncDelivery, new SubscriptionSet(),
- new SubscriptionImpl.Factory());
- }
-
- protected AMQQueue(AMQShortString name, boolean durable, AMQShortString owner,
- boolean autoDelete, QueueRegistry queueRegistry,
- SubscriptionSet subscribers, SubscriptionFactory subscriptionFactory)
- throws AMQException
- {
- this(name, durable, owner, autoDelete, queueRegistry,
- AsyncDeliveryConfig.getAsyncDeliveryExecutor(), subscribers, subscriptionFactory);
- }
protected AMQQueue(AMQShortString name, boolean durable, AMQShortString owner,
- boolean autoDelete, QueueRegistry queueRegistry,
+ boolean autoDelete, VirtualHost virtualHost,
SubscriptionSet subscribers)
throws AMQException
{
- this(name, durable, owner, autoDelete, queueRegistry,
+ this(name, durable, owner, autoDelete, virtualHost,
AsyncDeliveryConfig.getAsyncDeliveryExecutor(), subscribers, new SubscriptionImpl.Factory());
}
protected AMQQueue(AMQShortString name, boolean durable, AMQShortString owner,
- boolean autoDelete, QueueRegistry queueRegistry,
+ boolean autoDelete, VirtualHost virtualHost,
Executor asyncDelivery, SubscriptionSet subscribers, SubscriptionFactory subscriptionFactory)
throws AMQException
{
@@ -205,18 +180,20 @@ public class AMQQueue implements Managable, Comparable
{
throw new IllegalArgumentException("Queue name must not be null");
}
- if (queueRegistry == null)
+ if (virtualHost == null)
{
- throw new IllegalArgumentException("Queue registry must not be null");
+ throw new IllegalArgumentException("Virtual Host must not be null");
}
_name = name;
_durable = durable;
_owner = owner;
_autoDelete = autoDelete;
- _queueRegistry = queueRegistry;
+ _virtualHost = virtualHost;
_asyncDelivery = asyncDelivery;
+
_managedObject = createMBean();
_managedObject.register();
+
_subscribers = subscribers;
_subscriptionFactory = subscriptionFactory;
_deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
@@ -492,10 +469,18 @@ public class AMQQueue implements Managable, Comparable
public void delete() throws AMQException
{
- _subscribers.queueDeleted(this);
- _bindings.deregister();
- _queueRegistry.unregisterQueue(_name);
- _managedObject.unregister();
+ if(!_deleted.getAndSet(true))
+ {
+ _subscribers.queueDeleted(this);
+ _bindings.deregister();
+ _virtualHost.getQueueRegistry().unregisterQueue(_name);
+ _managedObject.unregister();
+ for(Task task : _deleteTaskList)
+ {
+ task.doTask(this);
+ }
+ _deleteTaskList.clear();
+ }
}
protected void autodelete() throws AMQException
@@ -620,6 +605,24 @@ public class AMQQueue implements Managable, Comparable
return _deliveryMgr.performGet(session, channel, acks);
}
-
+ public QueueRegistry getQueueRegistry()
+ {
+ return _virtualHost.getQueueRegistry();
+ }
+
+ public VirtualHost getVirtualHost()
+ {
+ return _virtualHost;
+ }
+
+ public static interface Task
+ {
+ public void doTask(AMQQueue queue) throws AMQException;
+ }
+
+ public void addQueueDeleteTask(Task task)
+ {
+ _deleteTaskList.add(task);
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
index 012b3600ca..ab67012b19 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
@@ -20,7 +20,9 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.server.management.MBeanDescription;
import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.MBeanConstructor;
+import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.Main;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -28,11 +30,7 @@ import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.mina.common.ByteBuffer;
import javax.management.openmbean.*;
-import javax.management.JMException;
-import javax.management.Notification;
-import javax.management.MBeanException;
-import javax.management.MBeanNotificationInfo;
-import javax.management.OperationsException;
+import javax.management.*;
import javax.management.monitor.MonitorNotification;
import java.util.List;
import java.util.ArrayList;
@@ -73,6 +71,12 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue
_queueName = jmxEncode(new StringBuffer(queue.getName()), 0).toString();
}
+
+ public ManagedObject getParentObject()
+ {
+ return _queue.getVirtualHost().getManagedObject();
+ }
+
static
{
try
@@ -373,6 +377,14 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue
return _messageList;
}
+//
+// public ObjectName getObjectName() throws MalformedObjectNameException
+// {
+// String objNameString = super.getObjectName().toString();
+//
+// return new ObjectName(objNameString);
+// }
+
/**
* returns Notifications sent by this MBean.
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
index 8ab26def74..084612ca41 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.framing.AMQShortString;
import java.util.concurrent.ConcurrentMap;
@@ -30,8 +31,16 @@ public class DefaultQueueRegistry implements QueueRegistry
{
private ConcurrentMap<AMQShortString, AMQQueue> _queueMap = new ConcurrentHashMap<AMQShortString, AMQQueue>();
- public DefaultQueueRegistry()
+ private final VirtualHost _virtualHost;
+
+ public DefaultQueueRegistry(VirtualHost virtualHost)
+ {
+ _virtualHost = virtualHost;
+ }
+
+ public VirtualHost getVirtualHost()
{
+ return _virtualHost;
}
public void registerQueue(AMQQueue queue) throws AMQException
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
index bfbaf27c84..c5f235f1b3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
@@ -21,11 +21,14 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.framing.AMQShortString;
public interface QueueRegistry
{
+ VirtualHost getVirtualHost();
+
void registerQueue(AMQQueue queue) throws AMQException;
void unregisterQueue(AMQShortString name) throws AMQException;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
index 48331843e5..0630d4f39f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.registry;
import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.Configurator;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.HashMap;
import java.util.Iterator;
@@ -38,7 +39,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
{
private static final Logger _logger = Logger.getLogger(ApplicationRegistry.class);
- private static Map _instanceMap = new HashMap();
+ private static Map<Integer, IApplicationRegistry> _instanceMap = new HashMap<Integer, IApplicationRegistry>();
private final Map<Class<?>, Object> _configuredObjects = new HashMap<Class<?>, Object>();
@@ -62,20 +63,13 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
{
synchronized (ApplicationRegistry.class)
{
- Iterator keyIterator = _instanceMap.keySet().iterator();
+ Iterator<IApplicationRegistry> keyIterator = _instanceMap.values().iterator();
while (keyIterator.hasNext())
{
- int key = (Integer) keyIterator.next();
- IApplicationRegistry instance = (IApplicationRegistry) _instanceMap.get(key);
-
- if ((instance != null))
- {
- if (instance.getMessageStore() != null)
- {
- instance.getMessageStore().close();
- }
- }
+ IApplicationRegistry instance = keyIterator.next();
+
+ instance.close();
}
}
}
@@ -118,7 +112,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
{
try
{
- ((IApplicationRegistry) _instanceMap.get(instanceID)).getMessageStore().close();
+ _instanceMap.get(instanceID).close();
}
catch (Exception e)
{
@@ -143,7 +137,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
public static IApplicationRegistry getInstance(int instanceID)
{
- IApplicationRegistry instance = (IApplicationRegistry) _instanceMap.get(instanceID);
+ IApplicationRegistry instance = _instanceMap.get(instanceID);
if (instance == null)
{
@@ -168,6 +162,14 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
}
}
+ public void close() throws Exception
+ {
+ for(VirtualHost virtualHost : getVirtualHostRegistry().getVirtualHosts())
+ {
+ virtualHost.close();
+ }
+ }
+
public Configuration getConfiguration()
{
return _configuration;
@@ -193,6 +195,8 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
return instance;
}
+
+
public static void setDefaultApplicationRegistry(String clazz)
{
_APPLICATION_REGISTRY = clazz;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
index 1eb490d6fb..790421b497 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
@@ -38,22 +38,26 @@ import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.security.auth.AuthenticationManager;
import org.apache.qpid.server.security.auth.SASLAuthenticationManager;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.mina.common.ByteBuffer;
import java.io.File;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
public class ConfigurationFileApplicationRegistry extends ApplicationRegistry
{
- private QueueRegistry _queueRegistry;
-
- private ExchangeRegistry _exchangeRegistry;
-
- private ExchangeFactory _exchangeFactory;
private ManagedObjectRegistry _managedObjectRegistry;
private AuthenticationManager _authenticationManager;
- private MessageStore _messageStore;
+ private VirtualHostRegistry _virtualHostRegistry;
+
+
+ private final Map<String, VirtualHost> _virtualHosts = new ConcurrentHashMap<String, VirtualHost>();
+
public ConfigurationFileApplicationRegistry(File configurationURL) throws ConfigurationException
{
@@ -91,11 +95,19 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry
public void initialise() throws Exception
{
initialiseManagedObjectRegistry();
- _queueRegistry = new DefaultQueueRegistry();
- _exchangeFactory = new DefaultExchangeFactory();
- _exchangeRegistry = new DefaultExchangeRegistry(_exchangeFactory);
+ _virtualHostRegistry = new VirtualHostRegistry();
_authenticationManager = new SASLAuthenticationManager();
- initialiseMessageStore();
+
+ initialiseVirtualHosts();
+ }
+
+ private void initialiseVirtualHosts() throws Exception
+ {
+ for(String name : getVirtualHostNames())
+ {
+
+ _virtualHostRegistry.registerVirtualHost(new VirtualHost(name,getConfiguration().subset("virtualhosts.virtualhost."+name)));
+ }
}
private void initialiseManagedObjectRegistry()
@@ -111,34 +123,10 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry
}
}
- private void initialiseMessageStore() throws Exception
- {
- String messageStoreClass = _configuration.getString("store.class");
- Class clazz = Class.forName(messageStoreClass);
- Object o = clazz.newInstance();
- if (!(o instanceof MessageStore))
- {
- throw new Exception("Message store class must implement " + MessageStore.class + ". Class " + clazz +
- " does not.");
- }
- _messageStore = (MessageStore) o;
- _messageStore.configure(getQueueRegistry(), "store", _configuration);
- }
-
- public QueueRegistry getQueueRegistry()
+ public VirtualHostRegistry getVirtualHostRegistry()
{
- return _queueRegistry;
- }
-
- public ExchangeRegistry getExchangeRegistry()
- {
- return _exchangeRegistry;
- }
-
- public ExchangeFactory getExchangeFactory()
- {
- return _exchangeFactory;
+ return _virtualHostRegistry;
}
public ManagedObjectRegistry getManagedObjectRegistry()
@@ -151,8 +139,8 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry
return _authenticationManager;
}
- public MessageStore getMessageStore()
+ public Collection<String> getVirtualHostNames()
{
- return _messageStore;
- }
+ return getConfiguration().getList("virtualhosts.virtualhost.name");
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
index cd664f9a4b..703aed69d2 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
@@ -26,8 +26,12 @@ import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.management.ManagedObjectRegistry;
import org.apache.qpid.server.security.auth.AuthenticationManager;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.commons.configuration.Configuration;
+import java.util.Collection;
+
public interface IApplicationRegistry
{
/**
@@ -35,7 +39,9 @@ public interface IApplicationRegistry
* that need access to the application registry itself for initialisation are able to use it. Attempting to
* initialise in the constructor will lead to failures since the registry reference will not have been set.
*/
- void initialise() throws Exception;
+ void initialise() throws Exception;
+
+ void close() throws Exception;
/**
* This gets access to a "configured object". A configured object has fields populated from a the configuration
@@ -54,15 +60,11 @@ public interface IApplicationRegistry
*/
Configuration getConfiguration();
- QueueRegistry getQueueRegistry();
-
- ExchangeRegistry getExchangeRegistry();
-
- ExchangeFactory getExchangeFactory();
-
ManagedObjectRegistry getManagedObjectRegistry();
AuthenticationManager getAuthenticationManager();
- MessageStore getMessageStore();
+ Collection<String> getVirtualHostNames();
+
+ VirtualHostRegistry getVirtualHostRegistry();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
index 81ce704026..7b8ba1d9cc 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
@@ -31,6 +31,7 @@ import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.log4j.Logger;
import java.util.HashMap;
@@ -46,8 +47,8 @@ import java.util.concurrent.CopyOnWriteArraySet;
public class AMQStateManager implements AMQMethodListener
{
private static final Logger _logger = Logger.getLogger(AMQStateManager.class);
- private final QueueRegistry _queueRegistry;
- private final ExchangeRegistry _exchangeRegistry;
+
+ private final VirtualHostRegistry _virtualHostRegistry;
private final AMQProtocolSession _protocolSession;
/**
* The current state
@@ -63,15 +64,15 @@ public class AMQStateManager implements AMQMethodListener
private CopyOnWriteArraySet<StateListener> _stateListeners = new CopyOnWriteArraySet<StateListener>();
- public AMQStateManager(QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession)
+
+ public AMQStateManager(VirtualHostRegistry virtualHostRegistry, AMQProtocolSession protocolSession)
{
- this(AMQState.CONNECTION_NOT_STARTED, true, queueRegistry, exchangeRegistry, protocolSession);
+ this(AMQState.CONNECTION_NOT_STARTED, true, virtualHostRegistry, protocolSession);
}
- protected AMQStateManager(AMQState initial, boolean register, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession)
+ protected AMQStateManager(AMQState initial, boolean register, VirtualHostRegistry virtualHostRegistry, AMQProtocolSession protocolSession)
{
- _queueRegistry = queueRegistry;
- _exchangeRegistry = exchangeRegistry;
+ _virtualHostRegistry = virtualHostRegistry;
_protocolSession = protocolSession;
_currentState = initial;
if (register)
@@ -176,7 +177,7 @@ public class AMQStateManager implements AMQMethodListener
checkChannel(evt, _protocolSession);
- handler.methodReceived(this, _queueRegistry, _exchangeRegistry, _protocolSession, evt);
+ handler.methodReceived(this, evt);
return true;
}
return false;
@@ -241,4 +242,14 @@ public class AMQStateManager implements AMQMethodListener
{
_stateListeners.remove(listener);
}
+
+ public VirtualHostRegistry getVirtualHostRegistry()
+ {
+ return _virtualHostRegistry;
+ }
+
+ public AMQProtocolSession getProtocolSession()
+ {
+ return _protocolSession;
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java b/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java
index 56323258b7..99d5d7fe88 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java
@@ -25,6 +25,7 @@ import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.qpid.framing.AMQMethodBody;
/**
@@ -34,7 +35,5 @@ import org.apache.qpid.framing.AMQMethodBody;
*/
public interface StateAwareMethodListener <B extends AMQMethodBody>
{
- void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<B> evt) throws AMQException;
+ void methodReceived(AMQStateManager stateManager, AMQMethodEvent<B> evt) throws AMQException;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index 98a4c3f6e7..eaaffa2dce 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -28,6 +28,7 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.ArrayList;
import java.util.List;
@@ -67,7 +68,7 @@ public class MemoryMessageStore implements MessageStore
_contentBodyMap = new ConcurrentHashMap<Long, List<ContentBody>>(hashtableCapacity);
}
- public void configure(QueueRegistry queueRegistry, String base, Configuration config) throws Exception
+ public void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception
{
configure(base, config);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
index c9c7045402..8daad0e5e5 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
@@ -27,6 +27,7 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.List;
@@ -35,13 +36,13 @@ public interface MessageStore
/**
* Called after instantiation in order to configure the message store. A particular implementation can define
* whatever parameters it wants.
- * @param queueRegistry the registry of queues to be used by this store
+ * @param virtualHost the virtual host using by this store
* @param base the base element identifier from which all configuration items are relative. For example, if the base
* element is "store", the all elements used by concrete classes will be "store.foo" etc.
* @param config the apache commons configuration object
* @throws Exception if an error occurs that means the store is unable to configure itself
*/
- void configure(QueueRegistry queueRegistry, String base, Configuration config) throws Exception;
+ void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception;
/**
* Called to close and cleanup any resources used by the message store.
diff --git a/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java
index 2e77f33363..e9a3a3d048 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java
@@ -33,24 +33,23 @@ import org.apache.qpid.server.security.auth.AuthenticationManager;
import org.apache.qpid.server.security.auth.NullAuthenticationManager;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.MapConfiguration;
import java.util.HashMap;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Arrays;
public class NullApplicationRegistry extends ApplicationRegistry
{
- private QueueRegistry _queueRegistry;
-
- private ExchangeRegistry _exchangeRegistry;
-
- private ExchangeFactory _exchangeFactory;
-
private ManagedObjectRegistry _managedObjectRegistry;
private AuthenticationManager _authenticationManager;
- private MessageStore _messageStore;
+ private VirtualHostRegistry _virtualHostRegistry;
public NullApplicationRegistry()
@@ -60,15 +59,16 @@ public class NullApplicationRegistry extends ApplicationRegistry
public void initialise() throws Exception
{
+ _configuration.addProperty("store.class","org.apache.qpid.server.store.MemoryMessageStore");
+
_managedObjectRegistry = new NoopManagedObjectRegistry();
- _queueRegistry = new DefaultQueueRegistry();
- _exchangeFactory = new DefaultExchangeFactory();
- _exchangeRegistry = new DefaultExchangeRegistry(_exchangeFactory);
+ _virtualHostRegistry = new VirtualHostRegistry();
+ VirtualHost dummyHost = new VirtualHost("test",getConfiguration());
+ _virtualHostRegistry.registerVirtualHost(dummyHost);
_authenticationManager = new NullAuthenticationManager();
- _messageStore = new MemoryMessageStore();
- ((MemoryMessageStore)_messageStore).configure();
_configuration.addProperty("heartbeat.delay", 10 * 60); // 10 minutes
+
}
public Configuration getConfiguration()
@@ -76,20 +76,6 @@ public class NullApplicationRegistry extends ApplicationRegistry
return _configuration;
}
- public QueueRegistry getQueueRegistry()
- {
- return _queueRegistry;
- }
-
- public ExchangeRegistry getExchangeRegistry()
- {
- return _exchangeRegistry;
- }
-
- public ExchangeFactory getExchangeFactory()
- {
- return _exchangeFactory;
- }
public ManagedObjectRegistry getManagedObjectRegistry()
{
@@ -101,9 +87,15 @@ public class NullApplicationRegistry extends ApplicationRegistry
return _authenticationManager;
}
- public MessageStore getMessageStore()
+ public Collection<String> getVirtualHostNames()
+ {
+ String[] hosts = {"test"};
+ return Arrays.asList( hosts );
+ }
+
+ public VirtualHostRegistry getVirtualHostRegistry()
{
- return _messageStore;
+ return _virtualHostRegistry;
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ManagedVirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ManagedVirtualHost.java
new file mode 100644
index 0000000000..2c888caac1
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ManagedVirtualHost.java
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import org.apache.qpid.server.management.MBeanAttribute;
+import org.apache.qpid.server.management.MBeanOperation;
+import org.apache.qpid.server.management.MBeanOperationParameter;
+import org.apache.qpid.server.queue.ManagedQueue;
+import org.apache.qpid.server.exchange.ManagedExchange;
+
+import javax.management.openmbean.TabularData;
+import javax.management.JMException;
+import javax.management.MBeanOperationInfo;
+import java.io.IOException;
+
+/**
+ * The management interface exposed to allow management of an Exchange.
+ * @version 0.1
+ */
+public interface ManagedVirtualHost
+{
+ static final String TYPE = "VirtualHost";
+
+ /**
+ * Returns the name of the managed virtualHost.
+ * @return the name of the exchange.
+ * @throws java.io.IOException
+ */
+ @MBeanAttribute(name="Name", description= TYPE + " Name")
+ String getName() throws IOException;
+
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
new file mode 100644
index 0000000000..15bad19e58
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -0,0 +1,193 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.DefaultQueueRegistry;
+import org.apache.qpid.server.exchange.*;
+import org.apache.qpid.server.management.*;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.configuration.Configurator;
+import org.apache.qpid.server.AMQBrokerManagerMBean;
+import org.apache.commons.configuration.Configuration;
+import org.apache.log4j.Logger;
+
+import javax.management.NotCompliantMBeanException;
+
+public class VirtualHost
+{
+ private static final Logger _logger = Logger.getLogger(VirtualHost.class);
+
+
+ private final String _name;
+
+ private QueueRegistry _queueRegistry;
+
+ private ExchangeRegistry _exchangeRegistry;
+
+ private ExchangeFactory _exchangeFactory;
+
+ private MessageStore _messageStore;
+
+ protected VirtualHostMBean _virtualHostMBean;
+
+ private AMQBrokerManagerMBean _brokerMBean;
+
+
+ /**
+ * Abstract MBean class. This has some of the methods implemented from
+ * management intrerface for exchanges. Any implementaion of an
+ * Exchange MBean should extend this class.
+ */
+ public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtualHost
+ {
+ public VirtualHostMBean() throws NotCompliantMBeanException
+ {
+ super(ManagedVirtualHost.class, "VirtualHost");
+ }
+
+
+ public String getObjectInstanceName()
+ {
+ return _name.toString();
+ }
+
+ public String getName()
+ {
+ return _name.toString();
+ }
+
+ public VirtualHost getVirtualHost()
+ {
+ return VirtualHost.this;
+ }
+
+
+ } // End of MBean class
+
+
+
+ public VirtualHost(String name, Configuration hostConfig) throws Exception
+ {
+ _name = name;
+
+ _virtualHostMBean = new VirtualHostMBean();
+ _virtualHostMBean.register();
+
+ _queueRegistry = new DefaultQueueRegistry(this);
+ _exchangeFactory = new DefaultExchangeFactory(this);
+ _exchangeRegistry = new DefaultExchangeRegistry(_exchangeFactory);
+
+ initialiseMessageStore(hostConfig);
+
+ _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
+ _brokerMBean.register();
+
+ }
+
+ private void initialiseMessageStore(Configuration config) throws Exception
+ {
+ String messageStoreClass = config.getString("store.class");
+
+ Class clazz = Class.forName(messageStoreClass);
+ Object o = clazz.newInstance();
+
+ if (!(o instanceof MessageStore))
+ {
+ throw new ClassCastException("Message store class must implement " + MessageStore.class + ". Class " + clazz +
+ " does not.");
+ }
+ _messageStore = (MessageStore) o;
+ _messageStore.configure(this, "store", config);
+ }
+
+
+
+
+ public <T> T getConfiguredObject(Class<T> instanceType, Configuration config)
+ {
+ T instance;
+ try
+ {
+ instance = instanceType.newInstance();
+ }
+ catch (Exception e)
+ {
+ _logger.error("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor");
+ throw new IllegalArgumentException("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor");
+ }
+ Configurator.configure(instance);
+
+ return instance;
+ }
+
+
+ public String getName()
+ {
+ return _name;
+ }
+
+ public QueueRegistry getQueueRegistry()
+ {
+ return _queueRegistry;
+ }
+
+ public ExchangeRegistry getExchangeRegistry()
+ {
+ return _exchangeRegistry;
+ }
+
+ public ExchangeFactory getExchangeFactory()
+ {
+ return _exchangeFactory;
+ }
+
+ public ApplicationRegistry getApplicationRegistry()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public MessageStore getMessageStore()
+ {
+ return _messageStore;
+ }
+
+ public void close() throws Exception
+ {
+ if(_messageStore != null)
+ {
+ _messageStore.close();
+ }
+ }
+
+ public ManagedObject getBrokerMBean()
+ {
+ return _brokerMBean;
+ }
+
+
+
+ public ManagedObject getManagedObject()
+ {
+ return _virtualHostMBean;
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java
new file mode 100644
index 0000000000..25f67c1cf3
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import java.util.Map;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.ArrayList;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+public class VirtualHostRegistry
+{
+ private final Map<String, VirtualHost> _registry = new ConcurrentHashMap<String,VirtualHost>();
+
+ public synchronized void registerVirtualHost(VirtualHost host) throws Exception
+ {
+ if(_registry.containsKey(host.getName()))
+ {
+ throw new Exception("Virtual Host with name " + host.getName() + " already registered.");
+ }
+ _registry.put(host.getName(),host);
+ }
+
+ public VirtualHost getVirtualHost(String name)
+ {
+ return _registry.get(name);
+ }
+
+ public Collection<VirtualHost> getVirtualHosts()
+ {
+ return new ArrayList<VirtualHost>(_registry.values());
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index a155117a7f..c82afd3906 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -156,7 +156,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
this(new AMQConnectionURL(ConnectionURL.AMQ_PROTOCOL + "://" +
username + ":" + password + "@" +
- (clientName == null ? "" : clientName) +
+ (clientName == null ? "" : clientName) + "/" +
virtualHost + "?brokerlist='" + AMQBrokerDetails.checkTransport(broker) + "'"));
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
index c134c2093b..b32a0ffdc4 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
@@ -114,6 +114,10 @@ public class AMQConnectionURL implements ConnectionURL
if (virtualHost != null && (!virtualHost.equals("")))
{
+ if(virtualHost.startsWith("/"))
+ {
+ virtualHost = virtualHost.substring(1);
+ }
setVirtualHost(virtualHost);
}
else
diff --git a/java/client/src/old_test/java/org/apache/qpid/test/unit/client/connection/TestManyConnections.java b/java/client/src/old_test/java/org/apache/qpid/test/unit/client/connection/TestManyConnections.java
index d89bc4a771..f59b36166a 100644
--- a/java/client/src/old_test/java/org/apache/qpid/test/unit/client/connection/TestManyConnections.java
+++ b/java/client/src/old_test/java/org/apache/qpid/test/unit/client/connection/TestManyConnections.java
@@ -46,7 +46,7 @@ public class TestManyConnections extends TestCase
long startTime = System.currentTimeMillis();
for (int i = 0; i < count; i++)
{
- createConnection(i, "vm://:1", "myClient" + i, "guest", "guest", "/test");
+ createConnection(i, "vm://:1", "myClient" + i, "guest", "guest", "test");
}
long endTime = System.currentTimeMillis();
_log.info("Time to create " + count + " connections: " + (endTime - startTime) +
diff --git a/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
index 39a2e9c627..6b03dd32e8 100644
--- a/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
+++ b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
@@ -76,7 +76,7 @@ public class MessageListenerMultiConsumerTest extends TestCase
Hashtable<String, String> env = new Hashtable<String, String>();
- env.put("connectionfactory.connection", "amqp://client:client@MLT_ID/tests?brokerlist='vm://:1'");
+ env.put("connectionfactory.connection", "amqp://client:client@MLT_ID/test?brokerlist='vm://:1'");
env.put("queue.queue", "direct://amq.direct//MessageListenerTest");
_context = factory.getInitialContext(env);
diff --git a/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java b/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java
index b99593aaa5..01c3d30314 100644
--- a/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java
+++ b/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java
@@ -70,7 +70,7 @@ public class MessageListenerTest extends TestCase implements MessageListener
Hashtable<String, String> env = new Hashtable<String, String>();
- env.put("connectionfactory.connection", "amqp://client:client@MLT_ID/tests?brokerlist='vm://:1'");
+ env.put("connectionfactory.connection", "amqp://client:client@MLT_ID/test?brokerlist='vm://:1'");
env.put("queue.queue", "direct://amq.direct//MessageListenerTest");
_context = factory.getInitialContext(env);
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
index 234b4c8a67..2d69b4fb82 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
@@ -49,7 +49,7 @@ public class RecoverTest extends TestCase
public void testRecoverResendsMsgs() throws Exception
{
- Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+ Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = new AMQQueue(new AMQShortString("someQ"), new AMQShortString("someQ"), false, true);
@@ -57,7 +57,7 @@ public class RecoverTest extends TestCase
//force synch to ensure the consumer has resulted in a bound queue
((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
- Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageProducer producer = producerSession.createProducer(queue);
@@ -106,7 +106,7 @@ public class RecoverTest extends TestCase
public void testRecoverResendsMsgsAckOnEarlier() throws Exception
{
- Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+ Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = new AMQQueue(new AMQShortString("someQ"), new AMQShortString("someQ"), false, true);
@@ -114,7 +114,7 @@ public class RecoverTest extends TestCase
//force synch to ensure the consumer has resulted in a bound queue
((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
- Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageProducer producer = producerSession.createProducer(queue);
@@ -170,7 +170,7 @@ public class RecoverTest extends TestCase
public void testAcknowledgePerConsumer() throws Exception
{
- Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+ Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = new AMQQueue(new AMQShortString("Q1"), new AMQShortString("Q1"), false, true);
@@ -178,7 +178,7 @@ public class RecoverTest extends TestCase
MessageConsumer consumer = consumerSession.createConsumer(queue);
MessageConsumer consumer2 = consumerSession.createConsumer(queue2);
- Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageProducer producer = producerSession.createProducer(queue);
MessageProducer producer2 = producerSession.createProducer(queue2);
@@ -209,7 +209,7 @@ public class RecoverTest extends TestCase
public void testRecoverInAutoAckListener() throws Exception
{
- Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+ Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
final Session consumerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = new AMQQueue(new AMQShortString("Q1"), new AMQShortString("Q1"), false, true);
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java
index 59be38f0dd..cf5b5c76e5 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java
@@ -48,7 +48,7 @@ public class BytesMessageTest extends TestCase implements MessageListener
protected void setUp() throws Exception
{
super.setUp();
- init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path"));
+ init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test"));
}
protected void tearDown() throws Exception
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java
index 5af55d6625..d1e90e7bcd 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java
@@ -54,7 +54,7 @@ public class FieldTableMessageTest extends TestCase implements MessageListener
protected void setUp() throws Exception
{
super.setUp();
- init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path"));
+ init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test"));
}
protected void tearDown() throws Exception
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java
index bc2def1c64..0d283aa0d9 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java
@@ -56,7 +56,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
TransportConnection.createVMBroker(1);
- init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path"));
+ init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test"));
}
catch (Exception e)
{
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java
index 1e9de221d4..66d82a991e 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java
@@ -41,7 +41,7 @@ public class MultipleConnectionTest extends TestCase
Receiver(String broker, AMQDestination dest, int sessions) throws Exception
{
- this(new AMQConnection(broker, "guest", "guest", randomize("Client"), "/test_path"), dest, sessions);
+ this(new AMQConnection(broker, "guest", "guest", randomize("Client"), "test"), dest, sessions);
}
Receiver(AMQConnection connection, AMQDestination dest, int sessions) throws Exception
@@ -72,7 +72,7 @@ public class MultipleConnectionTest extends TestCase
Publisher(String broker, AMQDestination dest) throws Exception
{
- this(new AMQConnection(broker, "guest", "guest", randomize("Client"), "/test_path"), dest);
+ this(new AMQConnection(broker, "guest", "guest", randomize("Client"), "test"), dest);
}
Publisher(AMQConnection connection, AMQDestination dest) throws Exception
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java
index 3f726ae5ab..dc1aadaa6c 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java
@@ -50,7 +50,7 @@ public class ObjectMessageTest extends TestCase implements MessageListener
TransportConnection.createVMBroker(1);
try
{
- init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path"));
+ init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test"));
}
catch (Exception e)
{
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
index 7423a3d8f0..d0126e1917 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
@@ -53,7 +53,7 @@ public class PropertyValueTest extends TestCase implements MessageListener
super.setUp();
try
{
- init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path"));
+ init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test"));
}
catch (Exception e)
{
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java
index f4814795c4..38b33f4b18 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java
@@ -51,11 +51,11 @@ public class PubSubTwoConnectionTest extends TestCase
public void testTwoConnections() throws Exception
{
Topic topic = new AMQTopic("MyTopic");
- Connection con1 = new AMQConnection("vm://:1", "guest", "guest", "Client1", "/test_path");
+ Connection con1 = new AMQConnection("vm://:1", "guest", "guest", "Client1", "test");
Session session1 = con1.createSession(false, AMQSession.NO_ACKNOWLEDGE);
MessageProducer producer = session1.createProducer(topic);
- Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "Client2", "/test_path");
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "Client2", "test");
Session session2 = con2.createSession(false, AMQSession.NO_ACKNOWLEDGE);
MessageConsumer consumer = session2.createConsumer(topic);
con2.start();
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/ReceiveTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/ReceiveTest.java
index 302551b05c..1db62cffa9 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/ReceiveTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/ReceiveTest.java
@@ -48,7 +48,7 @@ public class ReceiveTest extends TestCase
{
createVMBroker();
String broker = _connectionString;
- init(new AMQConnection(broker, "guest", "guest", "ReceiveTestClient", "/test_path"));
+ init(new AMQConnection(broker, "guest", "guest", "ReceiveTestClient", "test"));
}
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java
index 27a2ccb32e..fe15e151a3 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java
@@ -51,7 +51,7 @@ public class SelectorTest extends TestCase implements MessageListener
{
super.setUp();
TransportConnection.createVMBroker(1);
- init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path"));
+ init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test"));
}
protected void tearDown() throws Exception
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/SessionStartTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/SessionStartTest.java
index 726c7e39d7..cce02accd8 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/SessionStartTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/SessionStartTest.java
@@ -43,7 +43,7 @@ public class SessionStartTest extends TestCase implements MessageListener
protected void setUp() throws Exception
{
super.setUp();
- init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path"));
+ init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test"));
}
protected void tearDown() throws Exception
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java
index 81481bc94d..b50cd39780 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java
@@ -52,7 +52,7 @@ public class TextMessageTest extends TestCase implements MessageListener
super.setUp();
try
{
- init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path"));
+ init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test"));
}
catch (Exception e)
{
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java
index c4b60be1d1..db4e18a4a1 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java
@@ -42,7 +42,7 @@ public class AMQConnectionTest extends TestCase
{
super.setUp();
TransportConnection.createVMBroker(1);
- _connection = new AMQConnection("vm://:1", "guest", "guest", "fred", "/test");
+ _connection = new AMQConnection("vm://:1", "guest", "guest", "fred", "test");
_topic = new AMQTopic("mytopic");
_queue = new AMQQueue("myqueue");
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQSessionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQSessionTest.java
index 67c4f1dd6b..b01a129bf2 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQSessionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQSessionTest.java
@@ -46,7 +46,7 @@ public class AMQSessionTest extends TestCase
protected void setUp() throws Exception
{
super.setUp();
- _connection = new AMQConnection("vm://:1", "guest", "guest", "fred", "/test");
+ _connection = new AMQConnection("vm://:1", "guest", "guest", "fred", "test");
_topic = new AMQTopic("mytopic");
_queue = new AMQQueue("myqueue");
_session = (AMQSession) _connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java
index d84d66e26d..05d83be47f 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java
@@ -65,7 +65,7 @@ public class ChannelCloseOkTest extends TestCase
super.setUp();
TransportConnection.createVMBroker(1);
- _connection = new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path");
+ _connection = new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test");
_destination1 = new AMQQueue("q1", true);
_destination2 = new AMQQueue("q2", true);
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java
index 0b3ed931f8..7a665daeb3 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java
@@ -49,7 +49,7 @@ public class CloseWithBlockingReceiveTest extends TestCase
public void testReceiveReturnsNull() throws Exception
{
final Connection connection = new AMQConnection("vm://:1", "guest", "guest",
- "fred", "/test");
+ "fred", "test");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(new AMQTopic("banana"));
connection.start();
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
index 0bfe8dbddf..8441799990 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
@@ -55,7 +55,7 @@ public class ConnectionTest extends TestCase
{
try
{
- new AMQConnection(_broker, "guest", "guest", "fred", "/test");
+ new AMQConnection(_broker, "guest", "guest", "fred", "test");
}
catch (Exception e)
{
@@ -115,7 +115,7 @@ public class ConnectionTest extends TestCase
public void testClientIdCannotBeChanged() throws Exception
{
Connection connection = new AMQConnection(_broker, "guest", "guest",
- "fred", "/test");
+ "fred", "test");
try
{
connection.setClientID("someClientId");
@@ -130,7 +130,7 @@ public class ConnectionTest extends TestCase
public void testClientIdIsPopulatedAutomatically() throws Exception
{
Connection connection = new AMQConnection(_broker, "guest", "guest",
- null, "/test");
+ null, "test");
assertNotNull(connection.getClientID());
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
index 8e67f97787..c6dee1d9bf 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
@@ -33,14 +33,14 @@ public class ConnectionURLTest extends TestCase
public void testFailoverURL() throws URLSyntaxException
{
- String url = "amqp://ritchiem:bob@/temp?brokerlist='tcp://localhost:5672;tcp://fancyserver:3000/',failover='roundrobin'";
+ String url = "amqp://ritchiem:bob@/test?brokerlist='tcp://localhost:5672;tcp://fancyserver:3000/',failover='roundrobin'";
ConnectionURL connectionurl = new AMQConnectionURL(url);
assertTrue(connectionurl.getFailoverMethod().equals("roundrobin"));
assertTrue(connectionurl.getUsername().equals("ritchiem"));
assertTrue(connectionurl.getPassword().equals("bob"));
- assertTrue(connectionurl.getVirtualHost().equals("/temp"));
+ assertTrue(connectionurl.getVirtualHost().equals("test"));
assertTrue(connectionurl.getBrokerCount() == 2);
@@ -60,14 +60,14 @@ public class ConnectionURLTest extends TestCase
public void testSingleTransportUsernamePasswordURL() throws URLSyntaxException
{
- String url = "amqp://ritchiem:bob@/temp?brokerlist='tcp://localhost:5672'";
+ String url = "amqp://ritchiem:bob@/test?brokerlist='tcp://localhost:5672'";
ConnectionURL connectionurl = new AMQConnectionURL(url);
assertTrue(connectionurl.getFailoverMethod() == null);
assertTrue(connectionurl.getUsername().equals("ritchiem"));
assertTrue(connectionurl.getPassword().equals("bob"));
- assertTrue(connectionurl.getVirtualHost().equals("/temp"));
+ assertTrue(connectionurl.getVirtualHost().equals("test"));
assertTrue(connectionurl.getBrokerCount() == 1);
@@ -80,14 +80,14 @@ public class ConnectionURLTest extends TestCase
public void testSingleTransportUsernameBlankPasswordURL() throws URLSyntaxException
{
- String url = "amqp://ritchiem:@/temp?brokerlist='tcp://localhost:5672'";
+ String url = "amqp://ritchiem:@/test?brokerlist='tcp://localhost:5672'";
ConnectionURL connectionurl = new AMQConnectionURL(url);
assertTrue(connectionurl.getFailoverMethod() == null);
assertTrue(connectionurl.getUsername().equals("ritchiem"));
assertTrue(connectionurl.getPassword().equals(""));
- assertTrue(connectionurl.getVirtualHost().equals("/temp"));
+ assertTrue(connectionurl.getVirtualHost().equals("test"));
assertTrue(connectionurl.getBrokerCount() == 1);
@@ -100,7 +100,7 @@ public class ConnectionURLTest extends TestCase
public void testFailedURLNullPassword()
{
- String url = "amqp://ritchiem@/temp?brokerlist='tcp://localhost:5672'";
+ String url = "amqp://ritchiem@/test?brokerlist='tcp://localhost:5672'";
try
{
@@ -125,7 +125,7 @@ public class ConnectionURLTest extends TestCase
assertTrue(connectionurl.getFailoverMethod() == null);
assertTrue(connectionurl.getUsername().equals("guest"));
assertTrue(connectionurl.getPassword().equals("guest"));
- assertTrue(connectionurl.getVirtualHost().equals("/test"));
+ assertTrue(connectionurl.getVirtualHost().equals("test"));
assertTrue(connectionurl.getBrokerCount() == 1);
@@ -140,7 +140,7 @@ public class ConnectionURLTest extends TestCase
public void testSingleTransportWithClientURLURL() throws URLSyntaxException
{
- String url = "amqp://guest:guest@clientname/temp?brokerlist='tcp://localhost:5672'";
+ String url = "amqp://guest:guest@clientname/test?brokerlist='tcp://localhost:5672'";
ConnectionURL connectionurl = new AMQConnectionURL(url);
@@ -148,7 +148,7 @@ public class ConnectionURLTest extends TestCase
assertTrue(connectionurl.getFailoverMethod() == null);
assertTrue(connectionurl.getUsername().equals("guest"));
assertTrue(connectionurl.getPassword().equals("guest"));
- assertTrue(connectionurl.getVirtualHost().equals("/temp"));
+ assertTrue(connectionurl.getVirtualHost().equals("test"));
assertTrue(connectionurl.getClientName().equals("clientname"));
@@ -164,14 +164,14 @@ public class ConnectionURLTest extends TestCase
public void testSingleTransport1OptionURL() throws URLSyntaxException
{
- String url = "amqp://guest:guest@/temp?brokerlist='tcp://localhost:5672',routingkey='jim'";
+ String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672',routingkey='jim'";
ConnectionURL connectionurl = new AMQConnectionURL(url);
assertTrue(connectionurl.getFailoverMethod() == null);
assertTrue(connectionurl.getUsername().equals("guest"));
assertTrue(connectionurl.getPassword().equals("guest"));
- assertTrue(connectionurl.getVirtualHost().equals("/temp"));
+ assertTrue(connectionurl.getVirtualHost().equals("test"));
assertTrue(connectionurl.getBrokerCount() == 1);
@@ -187,14 +187,14 @@ public class ConnectionURLTest extends TestCase
public void testSingleTransportDefaultedBroker() throws URLSyntaxException
{
- String url = "amqp://guest:guest@/temp?brokerlist='localhost'";
+ String url = "amqp://guest:guest@/test?brokerlist='localhost'";
ConnectionURL connectionurl = new AMQConnectionURL(url);
assertTrue(connectionurl.getFailoverMethod() == null);
assertTrue(connectionurl.getUsername().equals("guest"));
assertTrue(connectionurl.getPassword().equals("guest"));
- assertTrue(connectionurl.getVirtualHost().equals("/temp"));
+ assertTrue(connectionurl.getVirtualHost().equals("test"));
assertTrue(connectionurl.getBrokerCount() == 1);
@@ -209,14 +209,14 @@ public class ConnectionURLTest extends TestCase
public void testSingleTransportDefaultedBrokerWithPort() throws URLSyntaxException
{
- String url = "amqp://guest:guest@/temp?brokerlist='localhost:1234'";
+ String url = "amqp://guest:guest@/test?brokerlist='localhost:1234'";
ConnectionURL connectionurl = new AMQConnectionURL(url);
assertTrue(connectionurl.getFailoverMethod() == null);
assertTrue(connectionurl.getUsername().equals("guest"));
assertTrue(connectionurl.getPassword().equals("guest"));
- assertTrue(connectionurl.getVirtualHost().equals("/temp"));
+ assertTrue(connectionurl.getVirtualHost().equals("test"));
assertTrue(connectionurl.getBrokerCount() == 1);
@@ -231,14 +231,14 @@ public class ConnectionURLTest extends TestCase
public void testSingleTransportDefaultedBrokerWithIP() throws URLSyntaxException
{
- String url = "amqp://guest:guest@/temp?brokerlist='127.0.0.1'";
+ String url = "amqp://guest:guest@/test?brokerlist='127.0.0.1'";
ConnectionURL connectionurl = new AMQConnectionURL(url);
assertTrue(connectionurl.getFailoverMethod() == null);
assertTrue(connectionurl.getUsername().equals("guest"));
assertTrue(connectionurl.getPassword().equals("guest"));
- assertTrue(connectionurl.getVirtualHost().equals("/temp"));
+ assertTrue(connectionurl.getVirtualHost().equals("test"));
assertTrue(connectionurl.getBrokerCount() == 1);
@@ -253,7 +253,7 @@ public class ConnectionURLTest extends TestCase
public void testSingleTransportDefaultedBrokerWithIPandPort() throws URLSyntaxException
{
- String url = "amqp://guest:guest@/temp?brokerlist='127.0.0.1:1234'";
+ String url = "amqp://guest:guest@/test?brokerlist='127.0.0.1:1234'";
// ConnectionURL connectionurl = new AMQConnectionURL(url);
//
@@ -276,14 +276,14 @@ public class ConnectionURLTest extends TestCase
public void testSingleTransportMultiOptionURL() throws URLSyntaxException
{
- String url = "amqp://guest:guest@/temp?brokerlist='tcp://localhost:5672',routingkey='jim',timeout='200',immediatedelivery='true'";
+ String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672',routingkey='jim',timeout='200',immediatedelivery='true'";
ConnectionURL connectionurl = new AMQConnectionURL(url);
assertTrue(connectionurl.getFailoverMethod() == null);
assertTrue(connectionurl.getUsername().equals("guest"));
assertTrue(connectionurl.getPassword().equals("guest"));
- assertTrue(connectionurl.getVirtualHost().equals("/temp"));
+ assertTrue(connectionurl.getVirtualHost().equals("test"));
assertTrue(connectionurl.getBrokerCount() == 1);
@@ -301,14 +301,14 @@ public class ConnectionURLTest extends TestCase
public void testSinglevmURL() throws URLSyntaxException
{
- String url = "amqp://guest:guest@/messages?brokerlist='vm://:2'";
+ String url = "amqp://guest:guest@/test?brokerlist='vm://:2'";
ConnectionURL connectionurl = new AMQConnectionURL(url);
assertTrue(connectionurl.getFailoverMethod() == null);
assertTrue(connectionurl.getUsername().equals("guest"));
assertTrue(connectionurl.getPassword().equals("guest"));
- assertTrue(connectionurl.getVirtualHost().equals("/messages"));
+ assertTrue(connectionurl.getVirtualHost().equals("test"));
assertTrue(connectionurl.getBrokerCount() == 1);
@@ -322,14 +322,14 @@ public class ConnectionURLTest extends TestCase
public void testFailoverVMURL() throws URLSyntaxException
{
- String url = "amqp://ritchiem:bob@/temp?brokerlist='vm://:2;vm://:3',failover='roundrobin'";
+ String url = "amqp://ritchiem:bob@/test?brokerlist='vm://:2;vm://:3',failover='roundrobin'";
ConnectionURL connectionurl = new AMQConnectionURL(url);
assertTrue(connectionurl.getFailoverMethod().equals("roundrobin"));
assertTrue(connectionurl.getUsername().equals("ritchiem"));
assertTrue(connectionurl.getPassword().equals("bob"));
- assertTrue(connectionurl.getVirtualHost().equals("/temp"));
+ assertTrue(connectionurl.getVirtualHost().equals("test"));
assertTrue(connectionurl.getBrokerCount() == 2);
@@ -369,7 +369,7 @@ public class ConnectionURLTest extends TestCase
assertTrue(connectionurl.getUsername().equals("user"));
assertTrue(connectionurl.getPassword().equals(""));
- assertTrue(connectionurl.getVirtualHost().equals("/test"));
+ assertTrue(connectionurl.getVirtualHost().equals("test"));
assertTrue(connectionurl.getBrokerCount() == 1);
}
@@ -428,7 +428,7 @@ public class ConnectionURLTest extends TestCase
String url = "amqp://guest:guest@/t.-_+!=:?brokerlist='tcp://localhost:5672'";
AMQConnectionURL connection = new AMQConnectionURL(url);
- assertTrue(connection.getVirtualHost().equals("/t.-_+!=:"));
+ assertTrue(connection.getVirtualHost().equals("t.-_+!=:"));
}
public void testCheckDefaultPort() throws URLSyntaxException
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java
index 6c2c684362..f12400c7b1 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java
@@ -87,7 +87,7 @@ public class Client implements MessageListener
static AMQConnection connect(String broker) throws Exception
{
- return new AMQConnection(broker, "guest", "guest", "Client" + System.currentTimeMillis(), "/test_path");
+ return new AMQConnection(broker, "guest", "guest", "Client" + System.currentTimeMillis(), "test");
}
public static void main(String[] argv) throws Exception
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Service.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Service.java
index a1c64e2246..58f9c6fc19 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Service.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Service.java
@@ -73,7 +73,7 @@ public class Service implements MessageListener
static AMQConnection connect(String broker) throws Exception
{
- return new AMQConnection(broker, "guest", "guest", "Client" + System.currentTimeMillis(), "/test_path");
+ return new AMQConnection(broker, "guest", "guest", "Client" + System.currentTimeMillis(), "test");
}
// public static void main(String[] argv) throws Exception
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java
index bbd1870168..0710605db9 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java
@@ -54,7 +54,7 @@ public class ObjectMessageTest extends TestCase implements MessageListener
protected void setUp() throws Exception
{
super.setUp();
- connection = new AMQConnection(_broker, "guest", "guest", randomize("Client"), "/test_path");
+ connection = new AMQConnection(_broker, "guest", "guest", randomize("Client"), "test");
destination = new AMQQueue(randomize("LatencyTest"), true);
session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java
index 6c064e3853..b6c539d91c 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java
@@ -28,7 +28,7 @@ public class TemporaryQueueTest extends TestCase
protected Connection createConnection() throws AMQException, URLSyntaxException
{
return new AMQConnection(_broker, "guest", "guest",
- "fred", "/test");
+ "fred", "test");
}
public void testTempoaryQueue() throws Exception
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/close/TopicPublisherCloseTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/close/TopicPublisherCloseTest.java
index c9240e9be7..7cbd4e8bdd 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/close/TopicPublisherCloseTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/close/TopicPublisherCloseTest.java
@@ -51,7 +51,7 @@ public class TopicPublisherCloseTest extends TestCase
public void testAllMethodsThrowAfterConnectionClose() throws Exception
{
- AMQConnection connection = new AMQConnection(_connectionString, "guest", "guest", "Client", "/test_path");
+ AMQConnection connection = new AMQConnection(_connectionString, "guest", "guest", "Client", "test");
Topic destination1 = new AMQTopic("t1");
TopicSession session1 = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java
index 23e3b9cc88..1f53d7de65 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java
@@ -40,12 +40,12 @@ public class JMSDestinationTest extends TestCase
public void testJMSDestination() throws Exception
{
- Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+ Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = new AMQQueue(new AMQShortString("someQ"), new AMQShortString("someQ"), false, true);
MessageConsumer consumer = consumerSession.createConsumer(queue);
- Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageProducer producer = producerSession.createProducer(queue);
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
index 4f0ca6d3aa..7d83d19d74 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
@@ -59,7 +59,7 @@ public class StreamMessageTest extends TestCase
public void testStreamMessageEOF() throws Exception
{
- Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+ Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
@@ -72,7 +72,7 @@ public class StreamMessageTest extends TestCase
//force synch to ensure the consumer has resulted in a bound queue
((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
- Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
AMQSession producerSession = (AMQSession) con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
@@ -113,7 +113,7 @@ public class StreamMessageTest extends TestCase
public void testModifyReceivedMessageExpandsBuffer() throws Exception
{
- Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+ Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
AMQQueue queue = new AMQQueue("testQ");
MessageConsumer consumer = consumerSession.createConsumer(queue);
@@ -135,7 +135,7 @@ public class StreamMessageTest extends TestCase
}
}
});
- Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
AMQSession producerSession = (AMQSession) con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageProducer mandatoryProducer = producerSession.createProducer(queue);
con.start();
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
index 8263e7f126..7e645f1a26 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
@@ -55,7 +55,7 @@ public class DurableSubscriptionTest extends TestCase
public void testUnsubscribe() throws AMQException, JMSException, URLSyntaxException
{
AMQTopic topic = new AMQTopic("MyTopic");
- AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "/test");
+ AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "test");
Session session1 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
MessageConsumer consumer1 = session1.createConsumer(topic);
MessageProducer producer = session1.createProducer(topic);
@@ -96,7 +96,7 @@ public class DurableSubscriptionTest extends TestCase
public void testDurability() throws AMQException, JMSException, URLSyntaxException
{
AMQTopic topic = new AMQTopic("MyTopic");
- AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "/test");
+ AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "test");
Session session1 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
MessageConsumer consumer1 = session1.createConsumer(topic);
MessageProducer producer = session1.createProducer(topic);
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicPublisherTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicPublisherTest.java
index 4ffb3e8459..c4acf15a58 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicPublisherTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicPublisherTest.java
@@ -48,7 +48,7 @@ public class TopicPublisherTest extends TestCase
public void testUnidentifiedProducer() throws Exception
{
AMQTopic topic = new AMQTopic("MyTopic");
- AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "/test");
+ AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "test");
TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
TopicPublisher publisher = session1.createPublisher(null);
MessageConsumer consumer1 = session1.createConsumer(topic);
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
index 8a6e279142..84c7a61a56 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
@@ -51,7 +51,7 @@ public class TopicSessionTest extends TestCase
public void testTopicSubscriptionUnsubscription() throws Exception
{
AMQTopic topic = new AMQTopic("MyTopic");
- AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test");
+ AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test");
TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
TopicSubscriber sub = session1.createDurableSubscriber(topic,"subscription0");
TopicPublisher publisher = session1.createPublisher(topic);
@@ -97,7 +97,7 @@ public class TopicSessionTest extends TestCase
{
AMQTopic topic = new AMQTopic("MyTopic1" + String.valueOf(shutdown));
AMQTopic topic2 = new AMQTopic("MyOtherTopic1" + String.valueOf(shutdown));
- AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test");
+ AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test");
TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
TopicSubscriber sub = session1.createDurableSubscriber(topic, "subscription0");
TopicPublisher publisher = session1.createPublisher(null);
@@ -112,7 +112,7 @@ public class TopicSessionTest extends TestCase
{
session1.close();
con.close();
- con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test");
+ con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test");
con.start();
session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
publisher = session1.createPublisher(null);
@@ -134,11 +134,11 @@ public class TopicSessionTest extends TestCase
public void testUnsubscriptionAfterConnectionClose() throws Exception
{
AMQTopic topic = new AMQTopic("MyTopic3");
- AMQConnection con1 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test");
+ AMQConnection con1 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test");
TopicSession session1 = con1.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
TopicPublisher publisher = session1.createPublisher(topic);
- AMQConnection con2 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test2", "/test");
+ AMQConnection con2 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test2", "test");
TopicSession session2 = con2.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
TopicSubscriber sub = session2.createDurableSubscriber(topic, "subscription0");
@@ -149,7 +149,7 @@ public class TopicSessionTest extends TestCase
assertNotNull(tm);
con2.close();
publisher.publish(session1.createTextMessage("Hello2"));
- con2 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test2", "/test");
+ con2 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test2", "test");
session2 = con2.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
sub = session2.createDurableSubscriber(topic, "subscription0");
con2.start();
@@ -163,7 +163,7 @@ public class TopicSessionTest extends TestCase
public void testTextMessageCreation() throws Exception
{
AMQTopic topic = new AMQTopic("MyTopic4");
- AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test");
+ AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test");
TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
TopicPublisher publisher = session1.createPublisher(topic);
MessageConsumer consumer1 = session1.createConsumer(topic);
@@ -202,7 +202,7 @@ public class TopicSessionTest extends TestCase
public void testSendingSameMessage() throws Exception
{
- AMQConnection conn = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test");
+ AMQConnection conn = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test");
TopicSession session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryTopic topic = session.createTemporaryTopic();
assertNotNull(topic);
@@ -224,7 +224,7 @@ public class TopicSessionTest extends TestCase
public void testTemporaryTopic() throws Exception
{
- AMQConnection conn = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test");
+ AMQConnection conn = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test");
TopicSession session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryTopic topic = session.createTemporaryTopic();
assertNotNull(topic);
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
index bbad5862a0..18b72e5538 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
@@ -61,7 +61,7 @@ public class TransactedTest extends TestCase
queue1 = new AMQQueue("Q1", false);
queue2 = new AMQQueue("Q2", false);
- con = new AMQConnection("vm://:1", "guest", "guest", "TransactedTest", "/test");
+ con = new AMQConnection("vm://:1", "guest", "guest", "TransactedTest", "test");
session = con.createSession(true, 0);
consumer1 = session.createConsumer(queue1);
//Dummy just to create the queue.
@@ -70,7 +70,7 @@ public class TransactedTest extends TestCase
producer2 = session.createProducer(queue2);
con.start();
- prepCon = new AMQConnection("vm://:1", "guest", "guest", "PrepConnection", "/test");
+ prepCon = new AMQConnection("vm://:1", "guest", "guest", "PrepConnection", "test");
prepSession = prepCon.createSession(false, AMQSession.NO_ACKNOWLEDGE);
prepProducer1 = prepSession.createProducer(queue1);
prepCon.start();
@@ -81,7 +81,7 @@ public class TransactedTest extends TestCase
prepProducer1.send(prepSession.createTextMessage("B"));
prepProducer1.send(prepSession.createTextMessage("C"));
- testCon = new AMQConnection("vm://:1", "guest", "guest", "TestConnection", "/test");
+ testCon = new AMQConnection("vm://:1", "guest", "guest", "TestConnection", "test");
testSession = testCon.createSession(false, AMQSession.NO_ACKNOWLEDGE);
testConsumer1 = testSession.createConsumer(queue1);
testConsumer2 = testSession.createConsumer(queue2);
@@ -142,7 +142,7 @@ public class TransactedTest extends TestCase
public void testResendsMsgsAfterSessionClose() throws Exception
{
- Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+ Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
Session consumerSession = con.createSession(true, Session.CLIENT_ACKNOWLEDGE);
AMQQueue queue3 = new AMQQueue("Q3", false);
@@ -150,7 +150,7 @@ public class TransactedTest extends TestCase
//force synch to ensure the consumer has resulted in a bound queue
((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
- Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
Session producerSession = con2.createSession(true, Session.CLIENT_ACKNOWLEDGE);
MessageProducer producer = producerSession.createProducer(queue3);
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java
index 352928b121..80f9ef62b1 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java
@@ -46,7 +46,7 @@ class ClusterBuilder
ServerHandlerRegistry getHandlerRegistry()
{
- return new ServerHandlerRegistry(getHandlerFactory(), null, null, null);
+ return new ServerHandlerRegistry(getHandlerFactory(), null, null);
}
private MethodHandlerFactory getHandlerFactory()
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
index c1306b4c13..8419ec5668 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
@@ -38,6 +38,7 @@ import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.cluster.util.LogMessage;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import java.net.InetSocketAddress;
@@ -55,13 +56,8 @@ public class ClusteredProtocolHandler extends AMQPFastProtocolHandler implements
}
public ClusteredProtocolHandler(IApplicationRegistry registry, InetSocketAddress address)
- {
- this(registry.getQueueRegistry(), registry.getExchangeRegistry(), address);
- }
-
- public ClusteredProtocolHandler(QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, InetSocketAddress address)
- {
- super(queueRegistry, exchangeRegistry);
+ {
+ super(registry);
ClusterBuilder builder = new ClusterBuilder(address);
_groupMgr = builder.getGroupManager();
_handlers = builder.getHandlerRegistry();
@@ -74,9 +70,9 @@ public class ClusteredProtocolHandler extends AMQPFastProtocolHandler implements
_handlers = handler._handlers;
}
- protected void createSession(IoSession session, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession protocolSession, AMQCodecFactory codec) throws AMQException
+ protected void createSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQProtocolSession protocolSession, AMQCodecFactory codec) throws AMQException
{
- new ClusteredProtocolSession(session, queues, exchanges, codec, new ServerHandlerRegistry(_handlers, queues, exchanges, protocolSession));
+ new ClusteredProtocolSession(session, virtualHostRegistry, codec, new ServerHandlerRegistry(_handlers, virtualHostRegistry, protocolSession));
}
void connect(String join) throws Exception
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java
index 04c5f7b451..fc635cc7ea 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java
@@ -24,6 +24,8 @@ import org.apache.mina.common.IoSession;
import org.apache.qpid.AMQException;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -37,11 +39,11 @@ public class ClusteredProtocolSession extends AMQMinaProtocolSession
{
private MemberHandle _peer;
- public ClusteredProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQCodecFactory codecFactory, AMQStateManager stateManager) throws AMQException
+ public ClusteredProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory, AMQStateManager stateManager) throws AMQException
// public ClusteredProtocolSession(IoSession session, QueueRegistry queueRegistry,
// ExchangeRegistry exchangeRegistry, AMQCodecFactory codecFactory) throws AMQException
{
- super(session, queueRegistry, exchangeRegistry, codecFactory, stateManager);
+ super(session, virtualHostRegistry, codecFactory, stateManager);
// super(session, queueRegistry, exchangeRegistry, codecFactory);
}
@@ -66,7 +68,7 @@ public class ClusteredProtocolSession extends AMQMinaProtocolSession
AMQChannel channel = super.getChannel(channelId);
if (isPeerSession() && channel == null)
{
- channel = new OneUseChannel(channelId);
+ channel = new OneUseChannel(channelId, getVirtualHost());
addChannel(channel);
}
return channel;
@@ -102,18 +104,12 @@ public class ClusteredProtocolSession extends AMQMinaProtocolSession
*/
private class OneUseChannel extends AMQChannel
{
- public OneUseChannel(int channelId)
- throws AMQException
- {
- this(channelId, ApplicationRegistry.getInstance());
- }
-
- public OneUseChannel(int channelId, IApplicationRegistry registry)
+ public OneUseChannel(int channelId, VirtualHost virtualHost)
throws AMQException
{
super(channelId,
- registry.getMessageStore(),
- registry.getExchangeRegistry());
+ virtualHost.getMessageStore(),
+ virtualHost.getExchangeRegistry());
}
protected void routeCurrentMessage() throws AMQException
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java
index 27d5629f27..03b0dc7f2e 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java
@@ -30,6 +30,7 @@ import org.apache.qpid.server.cluster.util.LogMessage;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import java.util.HashMap;
import java.util.Map;
@@ -43,23 +44,20 @@ class ServerHandlerRegistry extends AMQStateManager
private final Logger _logger = Logger.getLogger(ServerHandlerRegistry.class);
private final Map<AMQState, MethodHandlerRegistry> _handlers = new HashMap<AMQState, MethodHandlerRegistry>();
- ServerHandlerRegistry(QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry,
- AMQProtocolSession protocolSession)
+ ServerHandlerRegistry(VirtualHostRegistry virtualHostRegistry, AMQProtocolSession protocolSession)
{
- super(AMQState.CONNECTION_NOT_STARTED, false, queueRegistry, exchangeRegistry, protocolSession);
+ super(AMQState.CONNECTION_NOT_STARTED, false, virtualHostRegistry, protocolSession);
}
- ServerHandlerRegistry(ServerHandlerRegistry s, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession)
+ ServerHandlerRegistry(ServerHandlerRegistry s, VirtualHostRegistry virtualHostRegistry, AMQProtocolSession protocolSession)
{
- this(queueRegistry, exchangeRegistry, protocolSession);
+ this(virtualHostRegistry, protocolSession);
_handlers.putAll(s._handlers);
}
- ServerHandlerRegistry(MethodHandlerFactory factory, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession)
+ ServerHandlerRegistry(MethodHandlerFactory factory, VirtualHostRegistry virtualHostRegistry, AMQProtocolSession protocolSession)
{
- this(queueRegistry, exchangeRegistry, protocolSession);
+ this(virtualHostRegistry, protocolSession);
init(factory);
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java
index c4107a435b..86710e8a31 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java
@@ -24,6 +24,7 @@ import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQMethodBody;
@@ -54,19 +55,19 @@ public class ChainedClusterMethodHandler <A extends AMQMethodBody> extends Clust
}
}
- protected final void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+ protected final void peer(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException
{
for(ClusterMethodHandler<A> handler : _handlers)
{
- handler.peer(stateMgr, queues, exchanges, session, evt);
+ handler.peer(stateMgr, evt);
}
}
- protected final void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+ protected final void client(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException
{
for(ClusterMethodHandler<A> handler : _handlers)
{
- handler.client(stateMgr, queues, exchanges, session, evt);
+ handler.client(stateMgr, evt);
}
}
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java
index 27d3e28b88..c9f6dbfb37 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java
@@ -27,6 +27,7 @@ import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.cluster.util.LogMessage;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.*;
import org.apache.log4j.Logger;
@@ -79,22 +80,22 @@ class ChannelQueueManager
private class QueueDeclareHandler extends ClusterMethodHandler<QueueDeclareBody>
{
- protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException
+ protected void peer(AMQStateManager stateMgr, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException
{
}
- protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException
+ protected void client(AMQStateManager stateMgr, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException
{
set(evt.getChannelId(), evt.getMethod().queue);
}
}
private class QueueBindHandler extends ClusterMethodHandler<QueueBindBody>
{
- protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueBindBody> evt) throws AMQException
+ protected void peer(AMQStateManager stateMgr, AMQMethodEvent<QueueBindBody> evt) throws AMQException
{
}
- protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueBindBody> evt) throws AMQException
+ protected void client(AMQStateManager stateMgr, AMQMethodEvent<QueueBindBody> evt) throws AMQException
{
if(evt.getMethod().queue == null)
{
@@ -104,11 +105,11 @@ class ChannelQueueManager
}
private class QueueDeleteHandler extends ClusterMethodHandler<QueueDeleteBody>
{
- protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException
+ protected void peer(AMQStateManager stateMgr, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException
{
}
- protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException
+ protected void client(AMQStateManager stateMgr, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException
{
if(evt.getMethod().queue == null)
{
@@ -119,11 +120,11 @@ class ChannelQueueManager
private class BasicConsumeHandler extends ClusterMethodHandler<BasicConsumeBody>
{
- protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException
+ protected void peer(AMQStateManager stateMgr, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException
{
}
- protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException
+ protected void client(AMQStateManager stateMgr, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException
{
if(evt.getMethod().queue == null)
{
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java
index 971fa5393b..faab99b0f6 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java
@@ -32,18 +32,20 @@ import org.apache.qpid.AMQException;
public abstract class ClusterMethodHandler<A extends AMQMethodBody> implements StateAwareMethodListener<A>
{
- public final void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+ public final void methodReceived(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException
{
+ AMQProtocolSession session = stateMgr.getProtocolSession();
+
if (ClusteredProtocolSession.isPeerSession(session))
{
- peer(stateMgr, queues, exchanges, session, evt);
+ peer(stateMgr, evt);
}
else
{
- client(stateMgr, queues, exchanges, session, evt);
+ client(stateMgr, evt);
}
}
- protected abstract void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException;
- protected abstract void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException;
+ protected abstract void peer(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException;
+ protected abstract void client(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException;
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java
index 483096f29d..cd897671cc 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java
@@ -135,19 +135,15 @@ public class ClusterMethodHandlerFactory implements MethodHandlerFactory
private class SynchHandler implements StateAwareMethodListener<ClusterSynchBody>
{
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession session,
- AMQMethodEvent<ClusterSynchBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ClusterSynchBody> evt) throws AMQException
{
- _groupMgr.handleSynch(ClusteredProtocolSession.getSessionPeer(session));
+ _groupMgr.handleSynch(ClusteredProtocolSession.getSessionPeer(stateManager.getProtocolSession()));
}
}
private class JoinHandler implements StateAwareMethodListener<ClusterJoinBody>
{
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession session,
- AMQMethodEvent<ClusterJoinBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ClusterJoinBody> evt) throws AMQException
{
_groupMgr.handleJoin(new SimpleMemberHandle(evt.getMethod().broker));
}
@@ -155,9 +151,7 @@ public class ClusterMethodHandlerFactory implements MethodHandlerFactory
private class LeaveHandler implements StateAwareMethodListener<ClusterLeaveBody>
{
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<ClusterLeaveBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ClusterLeaveBody> evt) throws AMQException
{
_groupMgr.handleLeave(new SimpleMemberHandle(evt.getMethod().broker));
}
@@ -165,9 +159,7 @@ public class ClusterMethodHandlerFactory implements MethodHandlerFactory
private class SuspectHandler implements StateAwareMethodListener<ClusterSuspectBody>
{
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<ClusterSuspectBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ClusterSuspectBody> evt) throws AMQException
{
_groupMgr.handleSuspect(new SimpleMemberHandle(evt.getMethod().broker));
}
@@ -175,9 +167,7 @@ public class ClusterMethodHandlerFactory implements MethodHandlerFactory
private class MembershipHandler implements StateAwareMethodListener<ClusterMembershipBody>
{
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession session,
- AMQMethodEvent<ClusterMembershipBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ClusterMembershipBody> evt) throws AMQException
{
ClusterMembershipBody body = evt.getMethod();
_groupMgr.handleMembershipAnnouncement(new String(body.members));
@@ -186,16 +176,14 @@ public class ClusterMethodHandlerFactory implements MethodHandlerFactory
private class PingHandler implements StateAwareMethodListener<ClusterPingBody>
{
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession session,
- AMQMethodEvent<ClusterPingBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ClusterPingBody> evt) throws AMQException
{
MemberHandle peer = new SimpleMemberHandle(evt.getMethod().broker);
_groupMgr.handlePing(peer, evt.getMethod().load);
if (evt.getMethod().responseRequired)
{
evt.getMethod().load = _loadTable.getLocalLoad();
- session.writeFrame(new AMQFrame(evt.getChannelId(), evt.getMethod()));
+ stateManager.getProtocolSession().writeFrame(new AMQFrame(evt.getChannelId(), evt.getMethod()));
}
}
}
@@ -207,12 +195,12 @@ public class ClusterMethodHandlerFactory implements MethodHandlerFactory
super(ConnectionOpenMethodHandler.getInstance());
}
- void postHandle(AMQStateManager stateMgr, AMQProtocolSession session, AMQMethodEvent<ConnectionOpenBody> evt)
+ void postHandle(AMQStateManager stateMgr, AMQMethodEvent<ConnectionOpenBody> evt)
{
AMQShortString capabilities = evt.getMethod().capabilities;
if (ClusterCapability.contains(capabilities))
{
- ClusteredProtocolSession.setSessionPeer(session, ClusterCapability.getPeer(capabilities));
+ ClusteredProtocolSession.setSessionPeer(stateMgr.getProtocolSession(), ClusterCapability.getPeer(capabilities));
}
else
{
@@ -228,9 +216,9 @@ public class ClusterMethodHandlerFactory implements MethodHandlerFactory
super(ConnectionCloseMethodHandler.getInstance());
}
- void postHandle(AMQStateManager stateMgr, AMQProtocolSession session, AMQMethodEvent<ConnectionCloseBody> evt)
+ void postHandle(AMQStateManager stateMgr, AMQMethodEvent<ConnectionCloseBody> evt)
{
- if (!ClusteredProtocolSession.isPeerSession(session))
+ if (!ClusteredProtocolSession.isPeerSession(stateMgr.getProtocolSession()))
{
_loadTable.decrementLocalLoad();
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java
index 7eb3d7291c..a2f62f714b 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java
@@ -38,18 +38,18 @@ class ExtendedHandler<A extends AMQMethodBody> implements StateAwareMethodListen
_base = base;
}
- public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException
{
- preHandle(stateMgr, session, evt);
- _base.methodReceived(stateMgr, queues, exchanges, session, evt);
- postHandle(stateMgr, session, evt);
+ preHandle(stateMgr, evt);
+ _base.methodReceived(stateMgr, evt);
+ postHandle(stateMgr, evt);
}
- void preHandle(AMQStateManager stateMgr, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+ void preHandle(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException
{
}
- void postHandle(AMQStateManager stateMgr, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+ void postHandle(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException
{
}
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java
index 6b876095a4..f01a8349f2 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java
@@ -35,6 +35,7 @@ import org.apache.qpid.server.queue.ClusteredQueue;
import org.apache.qpid.server.queue.PrivateQueue;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.queue.RemoteQueueProxy;
+import org.apache.qpid.server.virtualhost.VirtualHost;
public class LocalQueueDeclareHandler extends QueueDeclareHandler
{
@@ -51,7 +52,7 @@ public class LocalQueueDeclareHandler extends QueueDeclareHandler
return new AMQShortString(super.createName().toString() + "@" + _groupMgr.getLocal().getDetails());
}
- protected AMQQueue createQueue(QueueDeclareBody body, QueueRegistry registry, AMQProtocolSession session) throws AMQException
+ protected AMQQueue createQueue(QueueDeclareBody body, VirtualHost virtualHost, AMQProtocolSession session) throws AMQException
{
//is it private or shared:
if (body.exclusive)
@@ -61,18 +62,18 @@ public class LocalQueueDeclareHandler extends QueueDeclareHandler
//need to get peer from the session...
MemberHandle peer = ClusteredProtocolSession.getSessionPeer(session);
_logger.debug(new LogMessage("Creating proxied queue {0} on behalf of {1}", body.queue, peer));
- return new RemoteQueueProxy(peer, _groupMgr, body.queue, body.durable, new AMQShortString(peer.getDetails()), body.autoDelete, registry);
+ return new RemoteQueueProxy(peer, _groupMgr, body.queue, body.durable, new AMQShortString(peer.getDetails()), body.autoDelete, virtualHost);
}
else
{
_logger.debug(new LogMessage("Creating local private queue {0}", body.queue));
- return new PrivateQueue(_groupMgr, body.queue, body.durable, session.getContextKey(), body.autoDelete, registry);
+ return new PrivateQueue(_groupMgr, body.queue, body.durable, session.getContextKey(), body.autoDelete, virtualHost);
}
}
else
{
_logger.debug(new LogMessage("Creating local shared queue {0}", body.queue));
- return new ClusteredQueue(_groupMgr, body.queue, body.durable, null, body.autoDelete, registry);
+ return new ClusteredQueue(_groupMgr, body.queue, body.durable, null, body.autoDelete, virtualHost);
}
}
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java
index 7f19569dbc..8b0bb4b127 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java
@@ -31,7 +31,7 @@ import org.apache.qpid.server.state.StateAwareMethodListener;
public class NullListener<T extends AMQMethodBody> implements StateAwareMethodListener<T>
{
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, AMQMethodEvent<T> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<T> evt) throws AMQException
{
}
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java
index 150b707071..447e51ccd9 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java
@@ -47,14 +47,14 @@ public class PeerHandler<A extends AMQMethodBody> extends ClusterMethodHandler<A
_client = client;
}
- protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+ protected void peer(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException
{
- _peer.methodReceived(stateMgr, queues, exchanges, session, evt);
+ _peer.methodReceived(stateMgr, evt);
}
- protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+ protected void client(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException
{
- _client.methodReceived(stateMgr, queues, exchanges, session, evt);
+ _client.methodReceived(stateMgr, evt);
}
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java
index 6668faca65..a669171d3c 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java
@@ -41,12 +41,11 @@ class QueueNameGenerator extends ClusterMethodHandler<QueueDeclareBody>
_handler = handler;
}
- protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException
+ protected void peer(AMQStateManager stateMgr, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException
{
}
- protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges,
- AMQProtocolSession session, AMQMethodEvent<QueueDeclareBody> evt)
+ protected void client(AMQStateManager stateMgr, AMQMethodEvent<QueueDeclareBody> evt)
throws AMQException
{
setName(evt.getMethod());//need to set the name before propagating this method
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java
index 0699678c9f..f09763e1ad 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java
@@ -32,15 +32,21 @@ import org.apache.qpid.server.queue.ClusteredQueue;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHost;
public class RemoteCancelHandler implements StateAwareMethodListener<BasicCancelBody>
{
private final Logger _logger = Logger.getLogger(RemoteCancelHandler.class);
- public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<BasicCancelBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicCancelBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ VirtualHost virtualHost = session.getVirtualHost();
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+
+
//By convention, consumers setup between brokers use the queue name as the consumer tag:
- AMQQueue queue = queues.getQueue(evt.getMethod().consumerTag);
+ AMQQueue queue = queueRegistry.getQueue(evt.getMethod().consumerTag);
if (queue instanceof ClusteredQueue)
{
((ClusteredQueue) queue).removeRemoteSubscriber(ClusteredProtocolSession.getSessionPeer(session));
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java
index c58ae291dd..073b13688c 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java
@@ -33,6 +33,7 @@ import org.apache.qpid.server.queue.ClusteredQueue;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHost;
/**
* Handles consume requests from other cluster members.
@@ -42,9 +43,13 @@ public class RemoteConsumeHandler implements StateAwareMethodListener<BasicConsu
{
private final Logger _logger = Logger.getLogger(RemoteConsumeHandler.class);
- public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException
{
- AMQQueue queue = queues.getQueue(evt.getMethod().queue);
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ VirtualHost virtualHost = session.getVirtualHost();
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+
+ AMQQueue queue = queueRegistry.getQueue(evt.getMethod().queue);
if (queue instanceof ClusteredQueue)
{
((ClusteredQueue) queue).addRemoteSubcriber(ClusteredProtocolSession.getSessionPeer(session));
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java
index 26a4967417..897f8e4fb7 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java
@@ -33,6 +33,7 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHost;
public class ReplicatingConsumeHandler extends ReplicatingHandler<BasicConsumeBody>
{
@@ -46,17 +47,22 @@ public class ReplicatingConsumeHandler extends ReplicatingHandler<BasicConsumeBo
super(groupMgr, base(), policy);
}
- protected void replicate(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException
+ protected void replicate(AMQStateManager stateManager, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ VirtualHost virtualHost = session.getVirtualHost();
+ ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+
//only replicate if the queue in question is a shared queue
- if (isShared(queues.getQueue(evt.getMethod().queue)))
+ if (isShared(queueRegistry.getQueue(evt.getMethod().queue)))
{
- super.replicate(stateMgr, queues, exchanges, session, evt);
+ super.replicate(stateManager, evt);
}
else
{
_logger.info(new LogMessage("Handling consume for private queue ({0}) locally", evt.getMethod()));
- local(stateMgr, queues, exchanges, session, evt);
+ local(stateManager, evt);
_logger.info(new LogMessage("Handled consume for private queue ({0}) locally", evt.getMethod()));
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java
index 84f97e7f59..888fa4e426 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java
@@ -32,6 +32,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.List;
@@ -60,52 +61,51 @@ class ReplicatingHandler<A extends AMQMethodBody> extends ClusterMethodHandler<A
_policy = policy;
}
- protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+ protected void peer(AMQStateManager stateManager, AMQMethodEvent<A> evt) throws AMQException
{
- local(stateMgr, queues, exchanges, session, evt);
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ VirtualHost virtualHost = session.getVirtualHost();
+ ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+
+ local(stateManager, evt);
_logger.debug(new LogMessage("Handled {0} locally", evt.getMethod()));
}
- protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+ protected void client(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException
{
- replicate(stateMgr, queues, exchanges, session, evt);
+ replicate(stateMgr, evt);
}
- protected void replicate(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+ protected void replicate(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException
{
if (_policy == null)
{
//asynch delivery
_groupMgr.broadcast(new SimpleBodySendable(evt.getMethod()));
- local(stateMgr, queues, exchanges, session, evt);
+ local(stateMgr, evt);
}
else
{
- Callback callback = new Callback(stateMgr, queues, exchanges, session, evt);
+ Callback callback = new Callback(stateMgr, evt);
_groupMgr.broadcast(new SimpleBodySendable(evt.getMethod()), _policy, callback);
}
_logger.debug(new LogMessage("Replicated {0} to peers", evt.getMethod()));
}
- protected void local(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+ protected void local(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException
{
- _base.methodReceived(stateMgr, queues, exchanges, session, evt);
+ _base.methodReceived(stateMgr, evt);
}
private class Callback implements GroupResponseHandler
{
private final AMQStateManager _stateMgr;
- private final QueueRegistry _queues;
- private final ExchangeRegistry _exchanges;
- private final AMQProtocolSession _session;
private final AMQMethodEvent<A> _evt;
- Callback(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt)
+ Callback(AMQStateManager stateMgr, AMQMethodEvent<A> evt)
{
_stateMgr = stateMgr;
- _queues = queues;
- _exchanges = exchanges;
- _session = session;
_evt = evt;
}
@@ -113,7 +113,7 @@ class ReplicatingHandler<A extends AMQMethodBody> extends ClusterMethodHandler<A
{
try
{
- local(_stateMgr, _queues, _exchanges, _session, _evt);
+ local(_stateMgr, _evt);
_logger.debug(new LogMessage("Handled {0} locally, in response to completion of replication", _evt.getMethod()));
}
catch (AMQException e)
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java
index 2561da36a8..8b0c638d63 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java
@@ -42,11 +42,11 @@ public class WrappedListener<T extends AMQMethodBody> implements StateAwareMetho
_primary = check(primary);
}
- public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<T> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateMgr, AMQMethodEvent<T> evt) throws AMQException
{
- _pre.methodReceived(stateMgr, queues, exchanges, session, evt);
- _primary.methodReceived(stateMgr, queues, exchanges, session, evt);
- _post.methodReceived(stateMgr, queues, exchanges, session, evt);
+ _pre.methodReceived(stateMgr, evt);
+ _primary.methodReceived(stateMgr, evt);
+ _post.methodReceived(stateMgr, evt);
}
private static <T extends AMQMethodBody> StateAwareMethodListener<T> check(StateAwareMethodListener<T> in)
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java
index ad9de8d93f..8ac4b9b2c7 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java
@@ -32,6 +32,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.ArrayList;
import java.util.HashMap;
@@ -73,8 +74,11 @@ public class ReplayStore implements ReplayManager, StateAwareMethodListener
_localRecorders.put(ExchangeDeleteBody.class, new ExchangeDeleteRecorder());
}
- public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ VirtualHost virtualHost = session.getVirtualHost();
+
_logger.debug(new LogMessage("Replay store received {0}", evt.getMethod()));
AMQMethodBody request = evt.getMethod();
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
index 5cf6d5c3ff..19be638051 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
@@ -29,6 +29,7 @@ import org.apache.qpid.server.cluster.*;
import org.apache.qpid.server.cluster.util.LogMessage;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
@@ -46,22 +47,14 @@ public class ClusteredQueue extends AMQQueue
private final GroupManager _groupMgr;
private final NestedSubscriptionManager _subscriptions;
- public ClusteredQueue(GroupManager groupMgr, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, QueueRegistry queueRegistry)
+ public ClusteredQueue(GroupManager groupMgr, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
throws AMQException
{
- super(name, durable, owner, autoDelete, queueRegistry, new ClusteredSubscriptionManager());
+ super(name, durable, owner, autoDelete, virtualHost, new ClusteredSubscriptionManager());
_groupMgr = groupMgr;
_subscriptions = ((ClusteredSubscriptionManager) getSubscribers()).getAllSubscribers();
}
- public ClusteredQueue(GroupManager groupMgr, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, QueueRegistry queueRegistry, Executor asyncDelivery)
- throws AMQException
- {
- super(name, durable, owner, autoDelete, queueRegistry, asyncDelivery, new ClusteredSubscriptionManager(),
- new SubscriptionImpl.Factory());
- _groupMgr = groupMgr;
- _subscriptions = ((ClusteredSubscriptionManager) getSubscribers()).getAllSubscribers();
- }
public void process(StoreContext storeContext, AMQMessage msg) throws AMQException
{
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java
index 568de62d1b..95ab34ccf9 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java
@@ -24,6 +24,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.server.cluster.SimpleSendable;
import org.apache.qpid.server.cluster.GroupManager;
import org.apache.qpid.server.cluster.SimpleBodySendable;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.framing.QueueDeleteBody;
import org.apache.qpid.framing.AMQShortString;
@@ -37,21 +38,14 @@ public class PrivateQueue extends AMQQueue
{
private final GroupManager _groupMgr;
- public PrivateQueue(GroupManager groupMgr, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, QueueRegistry queueRegistry)
+ public PrivateQueue(GroupManager groupMgr, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
throws AMQException
{
- super(name, durable, owner, autoDelete, queueRegistry);
+ super(name, durable, owner, autoDelete, virtualHost);
_groupMgr = groupMgr;
}
- public PrivateQueue(GroupManager groupMgr, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, QueueRegistry queueRegistry, Executor asyncDelivery)
- throws AMQException
- {
- super(name, durable, owner, autoDelete, queueRegistry, asyncDelivery);
- _groupMgr = groupMgr;
- }
-
protected void autodelete() throws AMQException
{
//delete locally:
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java
index a6cce05a03..d0a64c7d6f 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java
@@ -29,6 +29,7 @@ import org.apache.qpid.server.cluster.GroupManager;
import org.apache.qpid.server.cluster.MemberHandle;
import org.apache.qpid.server.cluster.SimpleSendable;
import org.apache.qpid.server.cluster.util.LogMessage;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.concurrent.Executor;
@@ -43,23 +44,15 @@ public class RemoteQueueProxy extends AMQQueue
private final MemberHandle _target;
private final GroupManager _groupMgr;
- public RemoteQueueProxy(MemberHandle target, GroupManager groupMgr, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, QueueRegistry queueRegistry)
+ public RemoteQueueProxy(MemberHandle target, GroupManager groupMgr, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
throws AMQException
{
- super(name, durable, owner, autoDelete, queueRegistry);
+ super(name, durable, owner, autoDelete, virtualHost);
_target = target;
_groupMgr = groupMgr;
_groupMgr.addMemberhipChangeListener(new ProxiedQueueCleanup(target, this));
}
- public RemoteQueueProxy(MemberHandle target, GroupManager groupMgr, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, QueueRegistry queueRegistry, Executor asyncDelivery)
- throws AMQException
- {
- super(name, durable, owner, autoDelete, queueRegistry, asyncDelivery);
- _target = target;
- _groupMgr = groupMgr;
- _groupMgr.addMemberhipChangeListener(new ProxiedQueueCleanup(target, this));
- }
public void deliver(AMQMessage msg) throws NoConsumersException
{
diff --git a/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java b/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java
index 57159f3802..172f1b1790 100644
--- a/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java
+++ b/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java
@@ -39,4 +39,10 @@ public class ExchangeDefaults
public final static AMQShortString FANOUT_EXCHANGE_NAME = new AMQShortString("amq.fanout");
public final static AMQShortString FANOUT_EXCHANGE_CLASS = new AMQShortString("fanout");
+
+
+ public final static AMQShortString SYSTEM_MANAGEMENT_EXCHANGE_NAME = new AMQShortString("qpid.sysmgmt");
+
+ public final static AMQShortString SYSTEM_MANAGEMENT_CLASS = new AMQShortString("sysmmgmt");
+
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java b/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
index b891c914ec..ed6ab63ded 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
@@ -531,6 +531,13 @@ public class BasicContentHeaderProperties implements ContentHeaderProperties
return _replyTo == null ? null : _replyTo.toString();
}
+ public AMQShortString getReplyToAsShortString()
+ {
+ decodeIfNecessary();
+ return _replyTo;
+ }
+
+
public void setReplyTo(String replyTo)
{
setReplyTo(replyTo == null ? null : new AMQShortString(replyTo));
diff --git a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/Constants.java b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/Constants.java
index 756d404596..8817f6c2c5 100644
--- a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/Constants.java
+++ b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/Constants.java
@@ -32,7 +32,7 @@ public class Constants
public final static String ITEM_TYPE = "type";
public final static String SERVER = "server";
public final static String DOMAIN = "domain";
- public final static String TYPE = "mbeantype";
+ public final static String NODE_TYPE_MBEANTYPE = "mbeantype";
public final static String MBEAN = "mbean";
public final static String ATTRIBUTE = "Attribute";
public final static String ATTRIBUTES = "Attributes";
@@ -45,13 +45,14 @@ public class Constants
public final static String NAVIGATION_ROOT = "Qpid Connections";
public final static String DESCRIPTION = " Description";
-
- public final static String BROKER_MANAGER = "Broker_Manager";
- public final static String QUEUE = "Queue";
- public final static String EXCHANGE = "Exchange";
+
+ public final static String VIRTUAL_HOST = "VirtualHost";
+ public final static String MBEAN_TYPE_BROKER_MANAGER = "VirtualHost.BrokerManager";
+ public final static String MBEAN_TYPE_QUEUE = "VirtualHost.Queue";
+ public final static String MBEAN_TYPE_EXCHANGE = "VirtualHost.Exchange";
public final static String EXCHANGE_TYPE = "ExchangeType";
public final static String[] EXCHANGE_TYPE_VALUES = {"direct", "topic", "headers"};
- public final static String CONNECTION ="Connection";
+ public final static String MBEAN_TYPE_CONNECTION ="Connection";
public final static String ACTION_ADDSERVER = "New Connection";
@@ -87,4 +88,13 @@ public class Constants
public final static int OPERATION_IMPACT_ACTION = 1;
public final static int OPERATION_IMPACT_ACTIONINFO = 2;
public final static int OPERATION_IMPACT_UNKNOWN = 3;
+ public static final String NODE_TYPE_VIRTUAL_HOST = "virtualhost";
+ public static final String NODE_LABEL_QUEUES = "queues";
+ public static final String NODE_LABEL_EXCHANGES = "exchanges";
+ public static final String NODE_LABEL_CONNECTIONS = "connections";
+
+ public static final String NODE_LABEL_VIRTUAL_HOSTS = "virtual hosts";
+ public static final String TAB_LABEL_QUEUES = "queues";
+ public static final String TAB_LABEL_EXCHANGES = "exchanges";
+ public static final String TAB_LABEL_CONNECTIONS = "connections";
}
diff --git a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ServerRegistry.java b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ServerRegistry.java
index 6fbfdcd06f..cc106c445b 100644
--- a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ServerRegistry.java
+++ b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ServerRegistry.java
@@ -23,6 +23,10 @@ package org.apache.qpid.management.ui;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.qpid.management.ui.jmx.ClientListener;
import org.apache.qpid.management.ui.model.ManagedAttributeModel;
@@ -33,11 +37,11 @@ public abstract class ServerRegistry
{
private ManagedServer _managedServer = null;
// list of all Connection mbeans
- protected List<ManagedBean> _connections = new ArrayList<ManagedBean>();
+ protected ConcurrentMap<String,List<ManagedBean>> _connections = new ConcurrentHashMap<String,List<ManagedBean>>();
// list of all exchange mbeans
- protected List<ManagedBean> _exchanges = new ArrayList<ManagedBean>();
+ protected ConcurrentMap<String,List<ManagedBean>> _exchanges = new ConcurrentHashMap<String,List<ManagedBean>>();
// list of all queue mbenas
- protected List<ManagedBean> _queues = new ArrayList<ManagedBean>();
+ protected ConcurrentMap<String,List<ManagedBean>> _queues = new ConcurrentHashMap<String,List<ManagedBean>>();
public ServerRegistry()
{
@@ -61,47 +65,68 @@ public abstract class ServerRegistry
protected void addConnectionMBean(ManagedBean mbean)
{
- _connections.add(mbean);
+ String virtualHostName = mbean.getProperty("VirtualHost");
+ _connections.putIfAbsent(virtualHostName, new ArrayList<ManagedBean>());
+ List<ManagedBean> beans = _connections.get(virtualHostName);
+ beans.add(mbean);
}
protected void addExchangeMBean(ManagedBean mbean)
{
- _exchanges.add(mbean);
+ String virtualHostName = mbean.getProperty("VirtualHost");
+ _exchanges.putIfAbsent(virtualHostName, new ArrayList<ManagedBean>());
+ List<ManagedBean> beans = _exchanges.get(virtualHostName);
+ beans.add(mbean);
}
protected void addQueueMBean(ManagedBean mbean)
{
- _queues.add(mbean);
+ String virtualHostName = mbean.getProperty("VirtualHost");
+ _queues.putIfAbsent(virtualHostName, new ArrayList<ManagedBean>());
+ List<ManagedBean> beans = _queues.get(virtualHostName);
+ beans.add(mbean);
}
protected void removeConnectionMBean(ManagedBean mbean)
{
- _connections.remove(mbean);
+ String virtualHostName = mbean.getProperty("VirtualHost");
+ _connections.putIfAbsent(virtualHostName, new ArrayList<ManagedBean>());
+ List<ManagedBean> beans = _connections.get(virtualHostName);
+ beans.remove(mbean);
}
protected void removeExchangeMBean(ManagedBean mbean)
{
- _exchanges.remove(mbean);
+ String virtualHostName = mbean.getProperty("VirtualHost");
+ _exchanges.putIfAbsent(virtualHostName, new ArrayList<ManagedBean>());
+ List<ManagedBean> beans = _exchanges.get(virtualHostName);
+ beans.remove(mbean);
}
protected void removeQueueMBean(ManagedBean mbean)
{
- _queues.remove(mbean);
+ String virtualHostName = mbean.getProperty("VirtualHost");
+ _queues.putIfAbsent(virtualHostName, new ArrayList<ManagedBean>());
+ List<ManagedBean> beans = _queues.get(virtualHostName);
+ beans.remove(mbean);
}
- public List<ManagedBean> getConnections()
+ public List<ManagedBean> getConnections(String virtualHost)
{
- return _connections;
+ _connections.putIfAbsent(virtualHost, new ArrayList<ManagedBean>());
+ return _connections.get(virtualHost);
}
- public List<ManagedBean> getExchanges()
+ public List<ManagedBean> getExchanges(String virtualHost)
{
- return _exchanges;
+ _exchanges.putIfAbsent(virtualHost, new ArrayList<ManagedBean>());
+ return _exchanges.get(virtualHost);
}
- public List<ManagedBean> getQueues()
+ public List<ManagedBean> getQueues(String virtualHost)
{
- return _queues;
+ _queues.putIfAbsent(virtualHost, new ArrayList<ManagedBean>());
+ return _queues.get(virtualHost);
}
public abstract void addManagedObject(ManagedBean key);
@@ -123,11 +148,11 @@ public abstract class ServerRegistry
public abstract OperationDataModel getOperationModel(ManagedBean mbean);
- public abstract String[] getQueueNames();
+ public abstract String[] getQueueNames(String virtualHost);
- public abstract String[] getExchangeNames();
+ public abstract String[] getExchangeNames(String virtualHost);
- public abstract String[] getConnectionNames();
+ public abstract String[] getConnectionNames(String virtualHost);
public abstract List<NotificationObject> getNotifications(ManagedBean mbean);
diff --git a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/ClientListener.java b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/ClientListener.java
index 00a9ae7653..6a23051a9e 100644
--- a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/ClientListener.java
+++ b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/ClientListener.java
@@ -43,8 +43,6 @@ public class ClientListener implements NotificationListener
public void handleNotification(Notification notification, Object handback)
{
- System.out.println("\nReceived server notification: " + notification);
-
ObjectName objName = null;
String type = notification.getType();
if (MBeanServerNotification.REGISTRATION_NOTIFICATION.equals(type))
diff --git a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/ClientNotificationListener.java b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/ClientNotificationListener.java
index 31b761fcf3..c6ecda4b4c 100644
--- a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/ClientNotificationListener.java
+++ b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/ClientNotificationListener.java
@@ -34,7 +34,6 @@ public class ClientNotificationListener extends ClientListener
public void handleNotification(Notification notification, Object handback)
{
- System.out.println("\nReceived mbean notification: " + notification);
ObjectName objName = (ObjectName)notification.getSource();
//String type = notification.getType();
getServerRegistry().addNotification(objName, notification);
diff --git a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/JMXServerRegistry.java b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/JMXServerRegistry.java
index c087bd2e72..727e1228f5 100644
--- a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/JMXServerRegistry.java
+++ b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/JMXServerRegistry.java
@@ -120,7 +120,8 @@ public class JMXServerRegistry extends ServerRegistry
}
catch (ListenerNotFoundException ex)
{
- System.out.println(ex.toString());
+ System.err.println(ex);
+ ex.printStackTrace();
}
}
@@ -131,15 +132,15 @@ public class JMXServerRegistry extends ServerRegistry
public void addManagedObject(ManagedBean mbean)
{
- if (Constants.QUEUE.equals(mbean.getType()) && !mbean.getName().startsWith("tmp_"))
+ if (Constants.MBEAN_TYPE_QUEUE.equals(mbean.getType()) && !mbean.getName().startsWith("tmp_"))
{
addQueueMBean(mbean);
}
- else if (Constants.EXCHANGE.equals(mbean.getType()))
+ else if (Constants.MBEAN_TYPE_EXCHANGE.equals(mbean.getType()))
{
addExchangeMBean(mbean);
}
- else if (Constants.CONNECTION.equals(mbean.getType()))
+ else if (Constants.MBEAN_TYPE_CONNECTION.equals(mbean.getType()))
{
addConnectionMBean(mbean);
}
@@ -149,11 +150,11 @@ public class JMXServerRegistry extends ServerRegistry
public void removeManagedObject(ManagedBean mbean)
{
- if (Constants.QUEUE.equals(mbean.getType()))
+ if (Constants.MBEAN_TYPE_QUEUE.equals(mbean.getType()))
removeQueueMBean(mbean);
- else if (Constants.EXCHANGE.equals(mbean.getType()))
+ else if (Constants.MBEAN_TYPE_EXCHANGE.equals(mbean.getType()))
removeExchangeMBean(mbean);
- else if (Constants.CONNECTION.equals(mbean.getType()))
+ else if (Constants.MBEAN_TYPE_CONNECTION.equals(mbean.getType()))
removeConnectionMBean(mbean);
_mbeansMap.remove(mbean.getUniqueName());
@@ -247,7 +248,6 @@ public class JMXServerRegistry extends ServerRegistry
list.add(type);
}
- System.out.println("Subscribed for notification :" + mbean.getUniqueName());
}
public boolean hasSubscribedForNotifications(ManagedBean mbean, String name, String type)
@@ -268,7 +268,6 @@ public class JMXServerRegistry extends ServerRegistry
public void removeNotificationListener(ManagedBean mbean, String name, String type) throws Exception
{
- System.out.println("Removed notification listener :" + mbean.getUniqueName() + name +type);
if (_subscribedNotificationMap.containsKey(mbean.getUniqueName()))
{
HashMap<String, List<String>> map = _subscribedNotificationMap.get(mbean.getUniqueName());
@@ -335,37 +334,40 @@ public class JMXServerRegistry extends ServerRegistry
return _operationModelMap.get(mbean.getUniqueName());
}
- public String[] getQueueNames()
+ public String[] getQueueNames(String virtualHost)
{
- String[] queues = new String[_queues.size()];
+ List<ManagedBean> queues = _queues.get(virtualHost);
+ String[] queueNames = new String[queues.size()];
int i = 0;
- for (ManagedBean mbean : _queues)
+ for (ManagedBean mbean : queues)
{
- queues[i++] = mbean.getName();
+ queueNames[i++] = mbean.getName();
}
- return queues;
+ return queueNames;
}
- public String[] getExchangeNames()
+ public String[] getExchangeNames(String virtualHost)
{
- String[] exchanges = new String[_exchanges.size()];
+ List<ManagedBean> exchanges = _exchanges.get(virtualHost);
+ String[] exchangeNames = new String[exchanges.size()];
int i = 0;
- for (ManagedBean mbean : _exchanges)
+ for (ManagedBean mbean : exchanges)
{
- exchanges[i++] = mbean.getName();
+ exchangeNames[i++] = mbean.getName();
}
- return exchanges;
+ return exchangeNames;
}
- public String[] getConnectionNames()
+ public String[] getConnectionNames(String virtualHost)
{
- String[] connections = new String[_connections.size()];
+ List<ManagedBean> connections = _connections.get(virtualHost);
+ String[] connectionNames = new String[connections.size()];
int i = 0;
- for (ManagedBean mbean : _connections)
+ for (ManagedBean mbean : connections)
{
- connections[i++] = mbean.getName();
+ connectionNames[i++] = mbean.getName();
}
- return connections;
+ return connectionNames;
}
public ClientNotificationListener getNotificationListener()
diff --git a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/MBeanTypeTabControl.java b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/MBeanTypeTabControl.java
index 73d56634ec..d8d76058a5 100644
--- a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/MBeanTypeTabControl.java
+++ b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/MBeanTypeTabControl.java
@@ -91,7 +91,7 @@ public class MBeanTypeTabControl
for (int i = 0; i < selectedItems.length; i++)
{
String name = selectedItems[i];;
- if (Constants.QUEUE.equals(_type))
+ if (Constants.MBEAN_TYPE_QUEUE.equals(_type))
{
int endIndex = name.lastIndexOf("(");
name = name.substring(0, endIndex -1);
@@ -231,21 +231,21 @@ public class MBeanTypeTabControl
java.util.List<ManagedBean> list = null;
// populate the map and list with appropriate mbeans
- if (_type.equals(Constants.QUEUE))
+ if (_type.equals(Constants.MBEAN_TYPE_QUEUE) || _type.equals(Constants.NODE_LABEL_QUEUES))
{
- list = serverRegistry.getQueues();
+ list = serverRegistry.getQueues(MBeanView.getVirtualHostName());
items = getQueueItems(list);
_sortBySizeButton.setVisible(true);
}
- else if (_type.equals(Constants.EXCHANGE))
+ else if (_type.equals(Constants.MBEAN_TYPE_EXCHANGE) || _type.equals(Constants.NODE_LABEL_EXCHANGES))
{
- list = serverRegistry.getExchanges();
+ list = serverRegistry.getExchanges(MBeanView.getVirtualHostName());
items = getItems(list);
_sortBySizeButton.setVisible(false);
}
- else if (_type.equals(Constants.CONNECTION))
+ else if (_type.equals(Constants.MBEAN_TYPE_CONNECTION) || _type.equals(Constants.NODE_LABEL_CONNECTIONS))
{
- list = serverRegistry.getConnections();
+ list = serverRegistry.getConnections(MBeanView.getVirtualHostName());
items = getItems(list);
_sortBySizeButton.setVisible(false);
}
diff --git a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/MBeanView.java b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/MBeanView.java
index 62871c4c91..1622c231c6 100644
--- a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/MBeanView.java
+++ b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/MBeanView.java
@@ -74,9 +74,12 @@ public class MBeanView extends ViewPart
// TabFolder to list all the mbeans for a given mbeantype(eg Connection, Queue, Exchange)
private TabFolder typeTabFolder = null;
+ private static String _virtualHostName;
+
+
/*
- * Listener for the selection events in the navigation view
- */
+ * Listener for the selection events in the navigation view
+ */
private class SelectionListenerImpl implements ISelectionListener
{
public void selectionChanged(IWorkbenchPart part, ISelection sel)
@@ -102,11 +105,11 @@ public class MBeanView extends ViewPart
setServer();
try
{
- if (Constants.TYPE.equals(_selectedNode.getType()))
+ if (Constants.NODE_TYPE_MBEANTYPE.equals(_selectedNode.getType()))
{
refreshTypeTabFolder(_selectedNode.getName());
}
- else if (Constants.DOMAIN.equals(_selectedNode.getType()))
+ else if (Constants.NODE_TYPE_VIRTUAL_HOST.equals(_selectedNode.getType()))
{
refreshTypeTabFolder(typeTabFolder.getItem(0));
}
@@ -146,6 +149,19 @@ public class MBeanView extends ViewPart
if (parent != null && parent.getType().equals(Constants.SERVER))
_server = (ManagedServer)parent.getManagedObject();
}
+
+ TreeObject parent = _selectedNode;
+ while (parent != null && !parent.getType().equals(Constants.NODE_TYPE_VIRTUAL_HOST))
+ {
+ parent = parent.getParent();
+ }
+
+ if (parent != null)
+ {
+ _virtualHostName = parent.getName().substring(1, parent.getName().length()-1);
+ }
+
+
}
public static ManagedServer getServer()
@@ -427,15 +443,15 @@ public class MBeanView extends ViewPart
typeTabFolder.setData("CONTROLLER", controller);
TabItem tab = new TabItem(typeTabFolder, SWT.NONE);
- tab.setText(Constants.CONNECTION);
+ tab.setText(Constants.TAB_LABEL_CONNECTIONS);
tab.setControl(controller.getControl());
tab = new TabItem(typeTabFolder, SWT.NONE);
- tab.setText(Constants.EXCHANGE);
+ tab.setText(Constants.TAB_LABEL_EXCHANGES);
tab.setControl(controller.getControl());
tab = new TabItem(typeTabFolder, SWT.NONE);
- tab.setText(Constants.QUEUE);
+ tab.setText(Constants.TAB_LABEL_QUEUES);
tab.setControl(controller.getControl());
typeTabFolder.addListener(SWT.Selection, new Listener()
@@ -469,21 +485,30 @@ public class MBeanView extends ViewPart
}
typeTabFolder.setSelection(tab);
MBeanTypeTabControl controller = (MBeanTypeTabControl)typeTabFolder.getData("CONTROLLER");
- controller.refresh(tab.getText());
+ String nodeType = Constants.NODE_LABEL_CONNECTIONS;
+ if(tab.getText().equals(Constants.TAB_LABEL_QUEUES))
+ {
+ nodeType = Constants.NODE_LABEL_QUEUES;
+ }
+ else if(tab.getText().equals(Constants.TAB_LABEL_EXCHANGES))
+ {
+ nodeType = Constants.NODE_LABEL_EXCHANGES;
+ }
+ controller.refresh(nodeType);
typeTabFolder.setVisible(true);
}
- private void refreshTypeTabFolder(String type) throws Exception
+ private void refreshTypeTabFolder(String name) throws Exception
{
- if (Constants.CONNECTION.equals(type))
+ if (Constants.NODE_LABEL_CONNECTIONS.equals(name))
{
refreshTypeTabFolder(typeTabFolder.getItem(0));
}
- else if (Constants.EXCHANGE.equals(type))
+ else if (Constants.NODE_LABEL_EXCHANGES.equals(name))
{
refreshTypeTabFolder(typeTabFolder.getItem(1));
}
- else if (Constants.QUEUE.equals(type))
+ else if (Constants.NODE_LABEL_QUEUES.equals(name))
{
refreshTypeTabFolder(typeTabFolder.getItem(2));
}
@@ -516,5 +541,11 @@ public class MBeanView extends ViewPart
typeTabFolder.setVisible(false);
}
}
+
+ public static String getVirtualHostName()
+ {
+ return _virtualHostName;
+
+ }
}
diff --git a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/NavigationView.java b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/NavigationView.java
index ef74f0c230..fd53aa31df 100644
--- a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/NavigationView.java
+++ b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/NavigationView.java
@@ -29,6 +29,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.qpid.management.ui.ApplicationRegistry;
import org.apache.qpid.management.ui.Constants;
@@ -39,6 +40,7 @@ import org.apache.qpid.management.ui.exceptions.InfoRequiredException;
import org.apache.qpid.management.ui.exceptions.ManagementConsoleException;
import org.apache.qpid.management.ui.jmx.JMXServerRegistry;
import org.apache.qpid.management.ui.jmx.MBeanUtility;
+import org.apache.qpid.management.ui.jmx.JMXManagedObject;
import org.eclipse.jface.viewers.DoubleClickEvent;
import org.eclipse.jface.viewers.IDoubleClickListener;
import org.eclipse.jface.viewers.IFontProvider;
@@ -266,7 +268,8 @@ public class NavigationView extends ViewPart
catch(Exception ex)
{
System.out.println("\nError in connecting to Qpid broker ");
- System.out.println("\n" + ex.toString());
+ System.out.println("\n" + ex);
+ ex.printStackTrace();
}
}
@@ -284,27 +287,66 @@ public class NavigationView extends ViewPart
// Add these three types - Connection, Exchange, Queue
// By adding these, these will always be available, even if there are no mbeans under thse types
// This is required because, the mbeans will be added from mbeanview, by selecting from the list
- TreeObject typeChild = new TreeObject(Constants.CONNECTION, Constants.TYPE);
- typeChild.setParent(domain);
- typeChild = new TreeObject(Constants.EXCHANGE, Constants.TYPE);
- typeChild.setParent(domain);
- typeChild = new TreeObject(Constants.QUEUE, Constants.TYPE);
- typeChild.setParent(domain);
-
-
+
+
+ TreeObject virtualhosts = new TreeObject(Constants.NODE_LABEL_VIRTUAL_HOSTS, Constants.NODE_TYPE_MBEANTYPE);
+ virtualhosts.setParent(domain);
+
+ Map<String, TreeObject> virtualHostMap = new HashMap<String, TreeObject>();
+
// Now populate the mbenas under those types
List<ManagedBean> mbeans = MBeanUtility.getManagedObjectsForDomain(server, domain.getName());
for (ManagedBean mbean : mbeans)
{
+
+ if(mbean.getType().equals(Constants.VIRTUAL_HOST))
+ {
+ TreeObject host = new TreeObject("[" + mbean.getName() + "]", Constants.NODE_TYPE_VIRTUAL_HOST);
+
+ virtualHostMap.put(mbean.getName(), host);
+ host.setParent(virtualhosts);
+
+ TreeObject child = new TreeObject(Constants.NODE_LABEL_CONNECTIONS, Constants.NODE_TYPE_MBEANTYPE);
+
+ child.setParent(host);
+ child = new TreeObject(Constants.NODE_LABEL_EXCHANGES, Constants.NODE_TYPE_MBEANTYPE);
+ child.setParent(host);
+ child = new TreeObject(Constants.NODE_LABEL_QUEUES, Constants.NODE_TYPE_MBEANTYPE);
+ child.setParent(host);
+
+ }
+ }
+ for (ManagedBean mbean : mbeans)
+ {
+
+
mbean.setServer(server);
- ServerRegistry serverRegistry = ApplicationRegistry.getServerRegistry(server);
- serverRegistry.addManagedObject(mbean);
+ ServerRegistry serverRegistry = ApplicationRegistry.getServerRegistry(server);
+ serverRegistry.addManagedObject(mbean);
+
+ if (mbean.getType().equals(Constants.MBEAN_TYPE_BROKER_MANAGER))
+ {
+ JMXManagedObject obj = (JMXManagedObject) mbean;
+ String host = obj.getProperty("VirtualHost");
+ TreeObject node = virtualHostMap.get(host);
+ if(node != null)
+ {
+ TreeObject beanNode = new TreeObject(mbean);
+ beanNode.setParent(node);
+ }
+ }
+
+
// Add all mbeans other than Connections, Exchanges and Queues. Because these will be added
// manually by selecting from MBeanView
- if (!(mbean.getType().equals(Constants.CONNECTION) || mbean.getType().equals(Constants.EXCHANGE) || mbean.getType().equals(Constants.QUEUE)))
+ if (!(mbean.getType().equals(Constants.MBEAN_TYPE_CONNECTION)
+ || mbean.getType().equals(Constants.MBEAN_TYPE_EXCHANGE)
+ || mbean.getType().equals(Constants.MBEAN_TYPE_QUEUE)
+ || mbean.getType().equals(Constants.VIRTUAL_HOST)
+ || mbean.getType().equals(Constants.MBEAN_TYPE_BROKER_MANAGER)))
{
- addManagedBean(domain, mbean);
+ addManagedBean(domain, mbean, virtualHostMap);
}
}
}
@@ -322,7 +364,7 @@ public class NavigationView extends ViewPart
for (TreeObject child : childNodes)
{
- if (Constants.TYPE.equals(child.getType()) && typeName.equals(child.getName()))
+ if (Constants.NODE_TYPE_MBEANTYPE.equals(child.getType()) && typeName.equals(child.getName()))
return child;
}
return null;
@@ -343,13 +385,22 @@ public class NavigationView extends ViewPart
* Adds the given MBean to the given domain node. Creates Notification node for the MBean.
* @param domain
* @param mbean mbean
+ * @param virtualHostMap
*/
- private void addManagedBean(TreeObject domain, ManagedBean mbean) throws Exception
+ private void addManagedBean(TreeObject domain, ManagedBean mbean, Map<String, TreeObject> virtualHostMap) throws Exception
{
+ JMXManagedObject obj = (JMXManagedObject) mbean;
+
+
+
String type = mbean.getType();
String name = mbean.getName();
- TreeObject typeNode = getMBeanTypeNode(domain, type);
+ String virtualHostName = obj.getProperty("VirtualHost");
+
+ TreeObject virtualHostNode = virtualHostMap.get(virtualHostName);
+
+ TreeObject typeNode = getMBeanTypeNode(virtualHostNode, getNodeLabelForType(type));
if (typeNode != null && doesMBeanNodeAlreadyExist(typeNode, name))
return;
@@ -368,8 +419,8 @@ public class NavigationView extends ViewPart
// type node does not exist. Now check if node to be created as mbeantype or MBean
if (name != null) // A managedObject with type and name
{
- typeNode = new TreeObject(type, Constants.TYPE);
- typeNode.setParent(domain);
+ typeNode = new TreeObject(type, Constants.NODE_TYPE_MBEANTYPE);
+ typeNode.setParent(virtualHostNode);
mbeanNode = new TreeObject(mbean);
mbeanNode.setParent(typeNode);
}
@@ -385,7 +436,27 @@ public class NavigationView extends ViewPart
TreeObject notificationNode = new TreeObject(Constants.NOTIFICATION, Constants.NOTIFICATION);
notificationNode.setParent(mbeanNode);
}
-
+
+ private String getNodeLabelForType(String type)
+ {
+ if(type.equals(Constants.MBEAN_TYPE_EXCHANGE))
+ {
+ return Constants.NODE_LABEL_EXCHANGES;
+ }
+ else if(type.equals(Constants.MBEAN_TYPE_QUEUE))
+ {
+ return Constants.NODE_LABEL_QUEUES;
+ }
+ else if(type.equals(Constants.MBEAN_TYPE_CONNECTION))
+ {
+ return Constants.NODE_LABEL_CONNECTIONS;
+ }
+ else
+ {
+ return type;
+ }
+ }
+
/**
* Removes all the child nodes of the given parent node
* @param parent
@@ -750,8 +821,26 @@ public class NavigationView extends ViewPart
break;
}
}
+ for (TreeObject child : domain.getChildren())
+ {
+ if (child.getName().equals(Constants.NODE_LABEL_VIRTUAL_HOSTS))
+ {
+ domain = child;
+ break;
+ }
+ }
+ Map<String, TreeObject> hostMap = new HashMap<String,TreeObject>();
+
+ for (TreeObject child: domain.getChildren())
+ {
+
+ if(child.getType().equals(Constants.NODE_TYPE_VIRTUAL_HOST))
+ {
+ hostMap.put(child.getName().substring(1,child.getName().length()-1), child);
+ }
+ }
- addManagedBean(domain, mbean);
+ addManagedBean(domain, mbean, hostMap);
_treeViewer.refresh();
}
@@ -773,7 +862,6 @@ public class NavigationView extends ViewPart
{
for (ManagedBean mbean : removalList)
{
- System.out.println("removing " + mbean.getName() + " " + mbean.getType());
TreeObject treeServerObject = _managedServerMap.get(mbean.getServer());
List<TreeObject> domains = treeServerObject.getChildren();
TreeObject domain = null;
diff --git a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/NotificationsTabControl.java b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/NotificationsTabControl.java
index 3eb93f55d3..c45ad7b362 100644
--- a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/NotificationsTabControl.java
+++ b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/NotificationsTabControl.java
@@ -326,7 +326,6 @@ public class NotificationsTabControl extends TabControl
Shell shell = null;
public void doubleClick(DoubleClickEvent event)
{
- System.out.println("DoubleClickEvent" + event);
display = Display.getCurrent();
shell = new Shell(display, SWT.BORDER | SWT.CLOSE | SWT.MIN |
SWT.MAX | SWT.RESIZE);
diff --git a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/OperationTabControl.java b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/OperationTabControl.java
index b0c67d03fe..204dd6f674 100644
--- a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/OperationTabControl.java
+++ b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/OperationTabControl.java
@@ -216,7 +216,7 @@ public class OperationTabControl extends TabControl
}
// Customised parameter widgets
- if (_mbean.getType().equals(Constants.EXCHANGE) &&
+ if (_mbean.getType().equals(Constants.MBEAN_TYPE_EXCHANGE) &&
"headers".equals(_mbean.getProperty(Constants.EXCHANGE_TYPE)) &&
_opData.getName().equalsIgnoreCase("createNewBinding"))
{
@@ -241,10 +241,10 @@ public class OperationTabControl extends TabControl
formData.top = new FormAttachment(0, params.indexOf(param) * heightForAParameter);
formData.left = new FormAttachment(label, 5);
formData.right = new FormAttachment(valueNumerator);
- if (param.getName().equals(Constants.QUEUE))
+ if (param.getName().equals(Constants.MBEAN_TYPE_QUEUE))
{
Combo combo = new Combo(_paramsComposite, SWT.READ_ONLY | SWT.DROP_DOWN);
- String[] items = ApplicationRegistry.getServerRegistry(_mbean).getQueueNames();
+ String[] items = ApplicationRegistry.getServerRegistry(_mbean).getQueueNames(_mbean.getProperty("VirtualHost"));
combo.setItems(items);
combo.add("Select Queue", 0);
combo.select(0);
@@ -253,10 +253,10 @@ public class OperationTabControl extends TabControl
combo.addSelectionListener(parameterSelectionListener);
valueInCombo = true;
}
- else if (param.getName().equals(Constants.EXCHANGE))
+ else if (param.getName().equals(Constants.MBEAN_TYPE_EXCHANGE))
{
Combo combo = new Combo(_paramsComposite, SWT.READ_ONLY | SWT.DROP_DOWN);
- String[] items = ApplicationRegistry.getServerRegistry(_mbean).getExchangeNames();
+ String[] items = ApplicationRegistry.getServerRegistry(_mbean).getExchangeNames(_mbean.getProperty("VirtualHost"));
combo.setItems(items);
combo.add("Select Exchange", 0);
combo.select(0);
@@ -358,7 +358,7 @@ public class OperationTabControl extends TabControl
formData.right = new FormAttachment(valueNumerator);
Combo combo = new Combo(composite, SWT.READ_ONLY | SWT.DROP_DOWN);
- String[] items = ApplicationRegistry.getServerRegistry(_mbean).getQueueNames();
+ String[] items = ApplicationRegistry.getServerRegistry(_mbean).getQueueNames(_mbean.getProperty("VirtualHost"));
combo.setItems(items);
combo.add("Select Queue", 0);
combo.select(0);
diff --git a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/ViewUtility.java b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/ViewUtility.java
index c2066a9277..c21be5d68c 100644
--- a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/ViewUtility.java
+++ b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/ViewUtility.java
@@ -277,7 +277,6 @@ public class ViewUtility
// Set the index being shown.
compositeHolder.setData(INDEX, index);
- System.out.println("index :" + index);
return (CompositeData)((Map.Entry)objectData.get(index)).getValue();
}
@@ -320,7 +319,6 @@ public class ViewUtility
if (itemType.isArray())
{
OpenType type = ((ArrayType)itemType).getElementOpenType();
- System.out.println("Array Element type = " + type.getClassName());
// If Byte array and mimetype is text, convert to text string
if (type.getClassName().equals(Byte.class.getName()))
{
@@ -390,7 +388,6 @@ public class ViewUtility
try
{
String textMessage = new String(byteArray, encoding);
- System.out.println("\nMessage : \n" + textMessage + "\n");
Text valueText = toolkit.createText(compositeHolder, textMessage, SWT.READ_ONLY | SWT.BORDER |
SWT.MULTI | SWT.WRAP | SWT.V_SCROLL);
diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java
index cbdd498b37..57512929c1 100644
--- a/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java
+++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java
@@ -104,7 +104,7 @@ public class ServiceRequestingClient implements ExceptionListener
m.getPropertyNames();
if (m.propertyExists("timeSent"))
{
- long timeSent = Long.parseLong(m.getStringProperty("timeSent"));
+ long timeSent = m.getLongProperty("timeSent");
if (_averageLatency == 0)
{
_averageLatency = _messageReceivedTime - timeSent;
diff --git a/java/systests/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java b/java/systests/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java
index ec27b8a191..62dc44e23f 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java
@@ -23,6 +23,7 @@ import org.apache.qpid.server.management.ManagedBroker;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.framing.AMQShortString;
public class AMQBrokerManagerMBeanTest extends TestCase
@@ -40,7 +41,9 @@ public class AMQBrokerManagerMBeanTest extends TestCase
assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange2)) == null);
assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange3)) == null);
- ManagedBroker mbean = new AMQBrokerManagerMBean();
+ VirtualHost vHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test");
+
+ ManagedBroker mbean = new AMQBrokerManagerMBean((VirtualHost.VirtualHostMBean)vHost.getManagedObject());
mbean.createNewExchange(exchange1,"direct",false, false);
mbean.createNewExchange(exchange2,"topic",false, false);
mbean.createNewExchange(exchange3,"headers",false, false);
@@ -61,7 +64,9 @@ public class AMQBrokerManagerMBeanTest extends TestCase
public void testQueueOperations() throws Exception
{
String queueName = "testQueue_" + System.currentTimeMillis();
- ManagedBroker mbean = new AMQBrokerManagerMBean();
+ VirtualHost vHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test");
+
+ ManagedBroker mbean = new AMQBrokerManagerMBean((VirtualHost.VirtualHostMBean)vHost.getManagedObject());
assertTrue(_queueRegistry.getQueue(new AMQShortString(queueName)) == null);
@@ -77,7 +82,7 @@ public class AMQBrokerManagerMBeanTest extends TestCase
{
super.setUp();
IApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
- _queueRegistry = appRegistry.getQueueRegistry();
- _exchangeRegistry = appRegistry.getExchangeRegistry();
+ _queueRegistry = appRegistry.getVirtualHostRegistry().getVirtualHost("test").getQueueRegistry();
+ _exchangeRegistry = appRegistry.getVirtualHostRegistry().getVirtualHost("test").getExchangeRegistry();
}
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index ea576a5661..84506f4f48 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -178,7 +178,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase
public TestQueue(AMQShortString name) throws AMQException
{
- super(name, false, new AMQShortString("test"), true, ApplicationRegistry.getInstance().getQueueRegistry());
+ super(name, false, new AMQShortString("test"), true, ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"));
}
/**
diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java
index 39c47118da..9653155a51 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java
@@ -21,7 +21,9 @@ import junit.framework.TestCase;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
@@ -36,6 +38,7 @@ public class ExchangeMBeanTest extends TestCase
{
private AMQQueue _queue;
private QueueRegistry _queueRegistry;
+ private VirtualHost _virtualHost;
/**
* Test for direct exchange mbean
@@ -45,7 +48,7 @@ public class ExchangeMBeanTest extends TestCase
public void testDirectExchangeMBean() throws Exception
{
DestNameExchange exchange = new DestNameExchange();
- exchange.initialise(ExchangeDefaults.DIRECT_EXCHANGE_NAME, false, 0, true);
+ exchange.initialise(_virtualHost, ExchangeDefaults.DIRECT_EXCHANGE_NAME, false, 0, true);
ManagedObject managedObj = exchange.getManagedObject();
ManagedExchange mbean = (ManagedExchange)managedObj;
@@ -72,7 +75,7 @@ public class ExchangeMBeanTest extends TestCase
public void testTopicExchangeMBean() throws Exception
{
DestWildExchange exchange = new DestWildExchange();
- exchange.initialise(ExchangeDefaults.TOPIC_EXCHANGE_NAME, false, 0, true);
+ exchange.initialise(_virtualHost,ExchangeDefaults.TOPIC_EXCHANGE_NAME, false, 0, true);
ManagedObject managedObj = exchange.getManagedObject();
ManagedExchange mbean = (ManagedExchange)managedObj;
@@ -99,7 +102,7 @@ public class ExchangeMBeanTest extends TestCase
public void testHeadersExchangeMBean() throws Exception
{
HeadersExchange exchange = new HeadersExchange();
- exchange.initialise(ExchangeDefaults.HEADERS_EXCHANGE_NAME, false, 0, true);
+ exchange.initialise(_virtualHost,ExchangeDefaults.HEADERS_EXCHANGE_NAME, false, 0, true);
ManagedObject managedObj = exchange.getManagedObject();
ManagedExchange mbean = (ManagedExchange)managedObj;
@@ -122,8 +125,11 @@ public class ExchangeMBeanTest extends TestCase
protected void setUp() throws Exception
{
super.setUp();
- _queueRegistry = ApplicationRegistry.getInstance().getQueueRegistry();
- _queue = new AMQQueue(new AMQShortString("testQueue"), false, new AMQShortString("ExchangeMBeanTest"), false, _queueRegistry);
+
+ IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance();
+ _virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test");
+ _queueRegistry = _virtualHost.getQueueRegistry();
+ _queue = new AMQQueue(new AMQShortString("testQueue"), false, new AMQShortString("ExchangeMBeanTest"), false, _virtualHost);
_queueRegistry.registerQueue(_queue);
}
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
index c01241d11d..70da7d1692 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.exchange;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.util.TestApplicationRegistry;
+import org.apache.qpid.server.util.NullApplicationRegistry;
import org.apache.qpid.framing.BasicPublishBody;
public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase
@@ -30,7 +31,7 @@ public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase
protected void setUp() throws Exception
{
super.setUp();
- ApplicationRegistry.initialise(new TestApplicationRegistry());
+ ApplicationRegistry.initialise(new NullApplicationRegistry());
}
public void testSimple() throws AMQException
diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
index 546c61eda0..c8271f1549 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
@@ -4,6 +4,7 @@ import junit.framework.TestCase;
import org.apache.log4j.Logger;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.util.TestApplicationRegistry;
+import org.apache.qpid.server.util.NullApplicationRegistry;
import org.apache.qpid.client.*;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.url.AMQBindingURL;
@@ -38,7 +39,7 @@ public class ReturnUnroutableMandatoryMessageTest extends TestCase implements Ex
{
super.setUp();
TransportConnection.createVMBroker(1);
- ApplicationRegistry.initialise(new TestApplicationRegistry(), 1);
+ ApplicationRegistry.initialise(new NullApplicationRegistry(), 1);
}
protected void tearDown() throws Exception
diff --git a/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java b/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
index 594330b945..dac0f06744 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
@@ -21,6 +21,9 @@ import junit.framework.TestCase;
import org.apache.mina.common.IoSession;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.exchange.DefaultExchangeFactory;
import org.apache.qpid.server.exchange.DefaultExchangeRegistry;
import org.apache.qpid.server.exchange.ExchangeRegistry;
@@ -46,6 +49,7 @@ public class AMQProtocolSessionMBeanTest extends TestCase
private QueueRegistry _queueRegistry;
private ExchangeRegistry _exchangeRegistry;
private AMQProtocolSessionMBean _mbean;
+ private VirtualHost _virtualHost;
public void testChannels() throws Exception
{
@@ -53,7 +57,7 @@ public class AMQProtocolSessionMBeanTest extends TestCase
int channelCount = _mbean.channels().size();
assertTrue(channelCount == 1);
AMQQueue queue = new org.apache.qpid.server.queue.AMQQueue(new AMQShortString("testQueue_" + System.currentTimeMillis()),
- false, new AMQShortString("test"), true, _queueRegistry);
+ false, new AMQShortString("test"), true, _virtualHost);
AMQChannel channel = new AMQChannel(2, _messageStore, null);
channel.setDefaultQueue(queue);
_protocolSession.addChannel(channel);
@@ -106,10 +110,12 @@ public class AMQProtocolSessionMBeanTest extends TestCase
{
super.setUp();
_channel = new AMQChannel(1, _messageStore, null);
- _queueRegistry = new DefaultQueueRegistry();
- _exchangeRegistry = new DefaultExchangeRegistry(new DefaultExchangeFactory());
+ IApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
+ _virtualHost = appRegistry.getVirtualHostRegistry().getVirtualHost("test");
+ _queueRegistry = _virtualHost.getQueueRegistry();
+ _exchangeRegistry = _virtualHost.getExchangeRegistry();
_mockIOSession = new MockIoSession();
- _protocolSession = new AMQMinaProtocolSession(_mockIOSession, _queueRegistry, _exchangeRegistry, new AMQCodecFactory(true));
+ _protocolSession = new AMQMinaProtocolSession(_mockIOSession, appRegistry.getVirtualHostRegistry(), new AMQCodecFactory(true));
_protocolSession.addChannel(_channel);
_mbean = (AMQProtocolSessionMBean)_protocolSession.getManagedObject();
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index 64492e3d67..ba60105824 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -24,6 +24,9 @@ import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.store.MessageStore;
@@ -50,6 +53,7 @@ public class AMQQueueMBeanTest extends TestCase
new HashSet<Long>());
private MockProtocolSession _protocolSession;
private AMQChannel _channel;
+ private VirtualHost _virtualHost;
public void testMessageCount() throws Exception
{
@@ -180,8 +184,10 @@ public class AMQQueueMBeanTest extends TestCase
protected void setUp() throws Exception
{
super.setUp();
- _queueRegistry = new DefaultQueueRegistry();
- _queue = new AMQQueue(new AMQShortString("testQueue"), false, new AMQShortString("AMQueueMBeanTest"), false, _queueRegistry);
+ IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance();
+ _virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test");
+ _queueRegistry = _virtualHost.getQueueRegistry();
+ _queue = new AMQQueue(new AMQShortString("testQueue"), false, new AMQShortString("AMQueueMBeanTest"), false, _virtualHost);
_queueMBean = new AMQQueueMBean(_queue);
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
index d10d5acdd0..2ec4eab74e 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
@@ -37,6 +37,7 @@ import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.util.TestApplicationRegistry;
+import org.apache.qpid.server.util.NullApplicationRegistry;
import java.util.LinkedList;
import java.util.Set;
@@ -67,7 +68,7 @@ public class AckTest extends TestCase
public AckTest() throws Exception
{
- ApplicationRegistry.initialise(new TestApplicationRegistry());
+ ApplicationRegistry.initialise(new NullApplicationRegistry());
}
protected void setUp() throws Exception
@@ -78,7 +79,7 @@ public class AckTest extends TestCase
_protocolSession = new MockProtocolSession(_messageStore);
_protocolSession.addChannel(_channel);
_subscriptionManager = new SubscriptionSet();
- _queue = new AMQQueue(new AMQShortString("myQ"), false, new AMQShortString("guest"), true, new DefaultQueueRegistry(), _subscriptionManager);
+ _queue = new AMQQueue(new AMQShortString("myQ"), false, new AMQShortString("guest"), true, ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"), _subscriptionManager);
}
private void publishMessages(int count) throws AMQException
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java
index f090f431c3..6f3d42d090 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java
@@ -23,6 +23,9 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.handler.OnCurrentThreadExecutor;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.*;
import java.util.concurrent.Executor;
@@ -50,11 +53,15 @@ public class ConcurrencyTest extends MessageTestHelper
private boolean isComplete;
private boolean failed;
+ private VirtualHost _virtualHost;
public ConcurrencyTest() throws Exception
{
+
+ IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance();
+ _virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test");
_deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscriptionMgr, new AMQQueue(new AMQShortString("myQ"), false, new AMQShortString("guest"), false,
- new DefaultQueueRegistry()));
+ _virtualHost));
}
public void testConcurrent1() throws InterruptedException, AMQException
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java b/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
index 1fb2a1024f..3f371161c6 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
@@ -25,6 +25,7 @@ import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.MessageStore;
@@ -137,4 +138,24 @@ public class MockProtocolSession implements AMQProtocolSession
{
return null;
}
+
+ public VirtualHost getVirtualHost()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void setVirtualHost(VirtualHost virtualHost)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void addSessionCloseTask(Task task)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void removeSessionCloseTask(Task task)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java
index 34f70bd2db..385b5b598a 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java
@@ -27,6 +27,7 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
@@ -42,9 +43,10 @@ public class SkeletonMessageStore implements MessageStore
public void configure(String base, Configuration config) throws Exception
{
}
-
- public void configure(QueueRegistry queueRegistry, String base, Configuration config) throws Exception
+
+ public void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception
{
+ //To change body of implemented methods use File | Settings | File Templates.
}
public void close() throws Exception
diff --git a/java/systests/src/main/java/org/apache/qpid/server/util/TestApplicationRegistry.java b/java/systests/src/main/java/org/apache/qpid/server/util/TestApplicationRegistry.java
index f801daf27c..849285e6d6 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/util/TestApplicationRegistry.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/util/TestApplicationRegistry.java
@@ -29,14 +29,18 @@ import org.apache.qpid.server.management.NoopManagedObjectRegistry;
import org.apache.qpid.server.queue.DefaultQueueRegistry;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.security.auth.AuthenticationManager;
import org.apache.qpid.server.security.auth.NullAuthenticationManager;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.MapConfiguration;
import java.util.HashMap;
+import java.util.Collection;
public class TestApplicationRegistry extends ApplicationRegistry
{
@@ -51,6 +55,7 @@ public class TestApplicationRegistry extends ApplicationRegistry
private AuthenticationManager _authenticationManager;
private MessageStore _messageStore;
+ private VirtualHost _vHost;
public TestApplicationRegistry()
{
@@ -59,10 +64,12 @@ public class TestApplicationRegistry extends ApplicationRegistry
public void initialise() throws Exception
{
- _managedObjectRegistry = new NoopManagedObjectRegistry();
- _queueRegistry = new DefaultQueueRegistry();
- _exchangeFactory = new DefaultExchangeFactory();
- _exchangeRegistry = new DefaultExchangeRegistry(_exchangeFactory);
+ IApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
+ _managedObjectRegistry = appRegistry.getManagedObjectRegistry();
+ _vHost = appRegistry.getVirtualHostRegistry().getVirtualHost("test");
+ _queueRegistry = _vHost.getQueueRegistry();
+ _exchangeFactory = _vHost.getExchangeFactory();
+ _exchangeRegistry = _vHost.getExchangeRegistry();
_authenticationManager = new NullAuthenticationManager();
_messageStore = new TestableMemoryMessageStore();
@@ -99,6 +106,16 @@ public class TestApplicationRegistry extends ApplicationRegistry
return _authenticationManager;
}
+ public Collection<String> getVirtualHostNames()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public VirtualHostRegistry getVirtualHostRegistry()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public MessageStore getMessageStore()
{
return _messageStore;