summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2007-02-16 23:11:41 +0000
committerRobert Godfrey <rgodfrey@apache.org>2007-02-16 23:11:41 +0000
commitdd8df96fcca8f5f9dcbe91ba012cff400a38daa7 (patch)
treeee84d98ec82abd31dd486f98fea1cb6bdb526db5
parent6213309b7c179fdddfeca0273d5c1f6592adedd7 (diff)
downloadqpid-python-dd8df96fcca8f5f9dcbe91ba012cff400a38daa7.tar.gz
QPID-375 : remove assumptions on standard exchanges (amq.direct, amq.topic, etc), allow other exchanges to be created through virtualhosts.xml
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@508649 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/etc/virtualhosts.xml110
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java53
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java147
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java67
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQDestination.java8
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQQueue.java49
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java61
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQTopic.java29
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java14
-rw-r--r--java/client/src/main/java/org/apache/qpid/jms/Session.java10
-rw-r--r--java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java9
-rw-r--r--java/client/src/old_test/java/org/apache/qpid/cluster/Client.java6
-rw-r--r--java/client/src/old_test/java/org/apache/qpid/flow/ChannelFlowTest.java4
-rw-r--r--java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargePublisher.java9
-rw-r--r--java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargeSubscriber.java8
-rw-r--r--java/client/src/old_test/java/org/apache/qpid/latency/LatencyTest.java4
-rw-r--r--java/client/src/old_test/java/org/apache/qpid/multiconsumer/AMQTest.java8
-rw-r--r--java/client/src/old_test/java/org/apache/qpid/pubsub1/TestPublisher.java6
-rw-r--r--java/client/src/old_test/java/org/apache/qpid/pubsub1/TestSubscriber.java8
-rw-r--r--java/client/src/old_test/java/org/apache/qpid/topic/MessageFactory.java6
-rw-r--r--java/client/src/old_test/java/org/apache/qpid/transacted/Ping.java4
-rw-r--r--java/client/src/old_test/java/org/apache/qpid/transacted/Pong.java4
-rw-r--r--java/client/src/old_test/java/org/apache/qpid/transacted/Start.java4
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java27
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/LargeMessageTest.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java3
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java4
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java7
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/ReceiveTest.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/SessionStartTest.java3
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java3
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java5
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/AMQSessionTest.java4
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java8
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java4
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java4
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java63
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java18
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java5
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Service.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/SpecialQueue.java11
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/close/TopicPublisherCloseTest.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java4
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java10
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java3
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java5
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java5
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicPublisherTest.java3
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java14
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java16
-rw-r--r--java/client/src/test/java/org/apache/qpid/testutil/Config.java5
-rw-r--r--java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java6
-rw-r--r--java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java21
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java5
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java5
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/topic/Listener.java5
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/topic/MessageFactory.java5
71 files changed, 677 insertions, 278 deletions
diff --git a/java/broker/etc/virtualhosts.xml b/java/broker/etc/virtualhosts.xml
index 3601daacc7..52ff23e090 100644
--- a/java/broker/etc/virtualhosts.xml
+++ b/java/broker/etc/virtualhosts.xml
@@ -24,58 +24,54 @@
<virtualhost>
<name>localhost</name>
<localhost>
- <minimumAlertRepeatGap>30000</minimumAlertRepeatGap>
- <maximumMessageCount>5000</maximumMessageCount>
- <queue>
- <name>queue</name>
+ <exchanges>
+ <exchange>
+ <type>direct</type>
+ <name>test.direct</name>
+ <durable>true</durable>
+ </exchange>
+ <exchange>
+ <type>topic</type>
+ <name>test.topic</name>
+ </exchange>
+ </exchanges>
+ <queues>
+ <exchange>amq.direct</exchange>
+ <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb -->
+ <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+ <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins -->
+
+ <queue>
+ <name>queue</name>
+ </queue>
+ <queue>
+ <name>ping</name>
+ </queue>
<queue>
- <exchange>amq.direct</exchange>
- <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb -->
- <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
- <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins -->
+ <name>test-queue</name>
+ <test-queue>
+ <exchange>test.direct</exchange>
+ <durable>true</durable>
+ </test-queue>
</queue>
- </queue>
- <queue>
- <name>ping</name>
- <ping>
- <exchange>amq.direct</exchange>
- <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb -->
- <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
- <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins -->
- </ping>
- </queue>
+ <queue>
+ <name>test-ping</name>
+ <test-ping>
+ <exchange>test.direct</exchange>
+ </test-ping>
+ </queue>
+
+ </queues>
</localhost>
</virtualhost>
+
+
<virtualhost>
<name>development</name>
<development>
<minimumAlertRepeatGap>30000</minimumAlertRepeatGap>
<maximumMessageCount>5000</maximumMessageCount>
- <queue>
- <name>queue</name>
- <queue>
- <exchange>amq.direct</exchange>
- <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb -->
- <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
- <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins -->
- </queue>
- </queue>
- <queue>
- <name>ping</name>
- <ping>
- <exchange>amq.direct</exchange>
- <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb -->
- <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
- <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins -->
- </ping>
- </queue>
- </development>
- </virtualhost>
- <virtualhost>
- <name>test</name>
- <test>
- <minimumAlertRepeatGap>30000</minimumAlertRepeatGap>
- <maximumMessageCount>5000</maximumMessageCount>
+ <queues>
<queue>
<name>queue</name>
<queue>
@@ -94,6 +90,34 @@
<maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins -->
</ping>
</queue>
+ </queues>
+ </development>
+ </virtualhost>
+ <virtualhost>
+ <name>test</name>
+ <test>
+ <minimumAlertRepeatGap>30000</minimumAlertRepeatGap>
+ <maximumMessageCount>5000</maximumMessageCount>
+ <queues>
+ <queue>
+ <name>queue</name>
+ <queue>
+ <exchange>amq.direct</exchange>
+ <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb -->
+ <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+ <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins -->
+ </queue>
+ </queue>
+ <queue>
+ <name>ping</name>
+ <ping>
+ <exchange>amq.direct</exchange>
+ <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb -->
+ <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+ <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins -->
+ </ping>
+ </queue>
+ </queues>
</test>
</virtualhost>
</virtualhosts>
diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
index bd8f0c9670..af38a9abe5 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
@@ -32,6 +32,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
@@ -71,7 +72,16 @@ public class VirtualHostConfiguration
throw new ConfigurationException("Unknown virtual host: " + virtualHostName);
}
- List queueNames = configuration.getList("queue.name");
+ List exchangeNames = configuration.getList("exchanges.exchange.name");
+
+ for(Object exchangeNameObj : exchangeNames)
+ {
+ String exchangeName = String.valueOf(exchangeNameObj);
+ configureExchange(virtualHost, exchangeName, configuration);
+ }
+
+
+ List queueNames = configuration.getList("queues.queue.name");
for(Object queueNameObj : queueNames)
{
@@ -81,12 +91,49 @@ public class VirtualHostConfiguration
}
+ private void configureExchange(VirtualHost virtualHost, String exchangeNameString, Configuration configuration) throws AMQException
+ {
+
+ CompositeConfiguration exchangeConfiguration = new CompositeConfiguration();
+
+ exchangeConfiguration.addConfiguration(configuration.subset("exchanges.exchange."+ exchangeNameString));
+ exchangeConfiguration.addConfiguration(configuration.subset("exchanges"));
+
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+ MessageStore messageStore = virtualHost.getMessageStore();
+ ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
+ ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory();
+
+ AMQShortString exchangeName = new AMQShortString(exchangeNameString);
+
+
+ Exchange exchange;
+
+
+
+ synchronized (exchangeRegistry)
+ {
+ exchange = exchangeRegistry.getExchange(exchangeName);
+ if(exchange == null)
+ {
+
+ AMQShortString type = new AMQShortString(exchangeConfiguration.getString("type","direct"));
+ boolean durable = exchangeConfiguration.getBoolean("durable",false);
+ boolean autodelete = exchangeConfiguration.getBoolean("autodelete",false);
+
+ Exchange newExchange = exchangeFactory.createExchange(exchangeName,type,durable,autodelete,0);
+ exchangeRegistry.registerExchange(newExchange);
+ }
+
+ }
+ }
+
private void configureQueue(VirtualHost virtualHost, String queueNameString, Configuration configuration) throws AMQException, ConfigurationException
{
CompositeConfiguration queueConfiguration = new CompositeConfiguration();
- queueConfiguration.addConfiguration(configuration.subset("queue."+ queueNameString));
- queueConfiguration.addConfiguration(configuration);
+ queueConfiguration.addConfiguration(configuration.subset("queues.queue."+ queueNameString));
+ queueConfiguration.addConfiguration(configuration.subset("queues"));
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
MessageStore messageStore = virtualHost.getMessageStore();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
index 7e3f9857f9..c7803133b3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
@@ -55,10 +55,6 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
public void registerExchange(Exchange exchange)
{
- if(_defaultExchange == null)
- {
- setDefaultExchange(exchange);
- }
_exchangeMap.put(exchange.getName(), exchange);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
index 7e378dfd01..3798918428 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
@@ -63,7 +63,7 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basi
// TODO: check the delivery tag field details - is it unique across the broker or per subscriber?
if (body.exchange == null)
{
- body.exchange = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
+ body.exchange = ExchangeDefaults.DEFAULT_EXCHANGE_NAME;
}
VirtualHost vHost = session.getVirtualHost();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
index 2218ff604f..a35cb9f7d3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
@@ -107,7 +107,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
queueRegistry.registerQueue(queue);
if (autoRegister)
{
- Exchange defaultExchange = exchangeRegistry.getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME);
+ Exchange defaultExchange = exchangeRegistry.getDefaultExchange();
defaultExchange.registerQueue(body.queue, queue, null);
queue.bind(body.queue, defaultExchange);
_log.info("Queue " + body.queue + " bound to default exchange");
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java
index 6b6163724c..fa8f13127a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java
@@ -28,13 +28,14 @@ import org.apache.qpid.server.exchange.ExchangeRegistry;
public class ExchangeInitialiser
{
- public void initialise(ExchangeFactory factory, ExchangeRegistry registry) throws AMQException{
+ public void initialise(ExchangeFactory factory, ExchangeRegistry registry) throws AMQException{
+ define(registry, factory, ExchangeDefaults.DEFAULT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
define(registry, factory, ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
define(registry, factory, ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS);
define(registry, factory, ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
define(registry, factory, ExchangeDefaults.FANOUT_EXCHANGE_NAME, ExchangeDefaults.FANOUT_EXCHANGE_CLASS);
- registry.setDefaultExchange(registry.getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME));
+ registry.setDefaultExchange(registry.getExchange(ExchangeDefaults.DEFAULT_EXCHANGE_NAME));
}
private void define(ExchangeRegistry r, ExchangeFactory f,
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index ebaa22ce44..2030876952 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -20,37 +20,6 @@
*/
package org.apache.qpid.client;
-import java.io.IOException;
-import java.net.ConnectException;
-import java.nio.channels.UnresolvedAddressException;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.jms.ConnectionConsumer;
-import javax.jms.ConnectionMetaData;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.IllegalStateException;
-import javax.jms.JMSException;
-import javax.jms.Queue;
-import javax.jms.QueueConnection;
-import javax.jms.QueueSession;
-import javax.jms.ServerSessionPool;
-import javax.jms.Session;
-import javax.jms.Topic;
-import javax.jms.TopicConnection;
-import javax.jms.TopicSession;
-import javax.naming.NamingException;
-import javax.naming.Reference;
-import javax.naming.Referenceable;
-import javax.naming.StringRefAddr;
-
-import org.apache.log4j.Logger;
import org.apache.qpid.AMQConnectionFailureException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUndeliveredException;
@@ -59,6 +28,8 @@ import org.apache.qpid.client.failover.FailoverSupport;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicQosBody;
import org.apache.qpid.framing.BasicQosOkBody;
import org.apache.qpid.framing.ChannelOpenBody;
@@ -73,6 +44,25 @@ import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.jms.FailoverPolicy;
import org.apache.qpid.url.URLSyntaxException;
+import org.apache.log4j.Logger;
+
+import javax.jms.*;
+import javax.jms.IllegalStateException;
+import javax.naming.NamingException;
+import javax.naming.Reference;
+import javax.naming.Referenceable;
+import javax.naming.StringRefAddr;
+import java.io.IOException;
+import java.net.ConnectException;
+import java.nio.channels.UnresolvedAddressException;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable
{
private static final Logger _logger = Logger.getLogger(AMQConnection.class);
@@ -157,12 +147,17 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
* The connection meta data
*/
private QpidConnectionMetaData _connectionMetaData;
-
+
/**
* Configuration info for SSL
*/
private SSLConfiguration _sslConfiguration;
+ private AMQShortString _defaultTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
+ private AMQShortString _defaultQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
+ private AMQShortString _temporaryTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
+ private AMQShortString _temporaryQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
+
/**
* @param broker brokerdetails
* @param username username
@@ -180,7 +175,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
(clientName == null ? "" : clientName) + "/" +
virtualHost + "?brokerlist='" + AMQBrokerDetails.checkTransport(broker) + "'"), null);
}
-
+
/**
* @param broker brokerdetails
* @param username username
@@ -198,20 +193,20 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
(clientName == null ? "" : clientName) + "/" +
virtualHost + "?brokerlist='" + AMQBrokerDetails.checkTransport(broker) + "'"), sslConfig);
}
-
+
public AMQConnection(String host, int port, String username, String password,
String clientName, String virtualHost) throws AMQException, URLSyntaxException
{
this(host, port, false, username, password, clientName, virtualHost, null);
}
-
+
public AMQConnection(String host, int port, String username, String password,
- String clientName, String virtualHost, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException
+ String clientName, String virtualHost, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException
{
- this(host, port, false, username, password, clientName, virtualHost, sslConfig);
+ this(host, port, false, username, password, clientName, virtualHost, sslConfig);
}
-
+
public AMQConnection(String host, int port, boolean useSSL, String username, String password,
String clientName, String virtualHost, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException
@@ -234,12 +229,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
this(new AMQConnectionURL(connection), null);
}
-
+
public AMQConnection(String connection, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException
{
this(new AMQConnectionURL(connection), sslConfig);
}
-
+
public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException
{
@@ -257,6 +252,28 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
_password = connectionURL.getPassword();
setVirtualHost(connectionURL.getVirtualHost());
+
+ if (connectionURL.getDefaultQueueExchangeName() != null)
+ {
+ _defaultQueueExchangeName = connectionURL.getDefaultQueueExchangeName();
+ }
+
+ if (connectionURL.getDefaultTopicExchangeName() != null)
+ {
+ _defaultTopicExchangeName = connectionURL.getDefaultTopicExchangeName();
+ }
+
+ if (connectionURL.getTemporaryQueueExchangeName() != null)
+ {
+ _temporaryQueueExchangeName = connectionURL.getTemporaryQueueExchangeName();
+ }
+
+ if (connectionURL.getTemporaryTopicExchangeName() != null)
+ {
+ _temporaryTopicExchangeName = connectionURL.getTemporaryTopicExchangeName();
+ }
+
+
_failoverPolicy = new FailoverPolicy(connectionURL);
_protocolHandler = new AMQProtocolHandler(this);
@@ -440,7 +457,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
}
- public Session createSession(final boolean transacted, final int acknowledgeMode) throws JMSException
+ public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode) throws JMSException
{
return createSession(transacted, acknowledgeMode, AMQSession.DEFAULT_PREFETCH_HIGH_MARK);
}
@@ -1070,9 +1087,53 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
AMQConnectionFactory.class.getName(),
null); // factory location
}
-
+
public SSLConfiguration getSSLConfiguration()
{
- return _sslConfiguration;
+ return _sslConfiguration;
+ }
+
+ public AMQShortString getDefaultTopicExchangeName()
+ {
+ return _defaultTopicExchangeName;
+ }
+
+
+ public void setDefaultTopicExchangeName(AMQShortString defaultTopicExchangeName)
+ {
+ _defaultTopicExchangeName = defaultTopicExchangeName;
+ }
+
+
+ public AMQShortString getDefaultQueueExchangeName()
+ {
+ return _defaultQueueExchangeName;
+ }
+
+
+ public void setDefaultQueueExchangeName(AMQShortString defaultQueueExchangeName)
+ {
+ _defaultQueueExchangeName = defaultQueueExchangeName;
+ }
+
+ public AMQShortString getTemporaryTopicExchangeName()
+ {
+ return _temporaryTopicExchangeName;
+ }
+
+ public AMQShortString getTemporaryQueueExchangeName()
+ {
+ return _temporaryQueueExchangeName; //To change body of created methods use File | Settings | File Templates.
+ }
+
+
+ public void setTemporaryTopicExchangeName(AMQShortString temporaryTopicExchangeName)
+ {
+ _temporaryTopicExchangeName = temporaryTopicExchangeName;
+ }
+
+ public void setTemporaryQueueExchangeName(AMQShortString temporaryQueueExchangeName)
+ {
+ _temporaryQueueExchangeName = temporaryQueueExchangeName;
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
index fea83d3128..0dcc544ea8 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
@@ -20,6 +20,12 @@
*/
package org.apache.qpid.client;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.jms.BrokerDetails;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.url.URLHelper;
+import org.apache.qpid.url.URLSyntaxException;
+
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
@@ -27,11 +33,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.StringTokenizer;
-import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.jms.ConnectionURL;
-import org.apache.qpid.url.URLHelper;
-import org.apache.qpid.url.URLSyntaxException;
-
public class AMQConnectionURL implements ConnectionURL
{
private String _url;
@@ -43,6 +44,11 @@ public class AMQConnectionURL implements ConnectionURL
private String _username;
private String _password;
private String _virtualHost;
+ private AMQShortString _defaultQueueExchangeName;
+ private AMQShortString _defaultTopicExchangeName;
+ private AMQShortString _temporaryTopicExchangeName;
+ private AMQShortString _temporaryQueueExchangeName;
+
public AMQConnectionURL(String fullURL) throws URLSyntaxException
{
@@ -107,7 +113,7 @@ public class AMQConnectionURL implements ConnectionURL
if (userInfo == null)
{
throw URLHelper.parseError(AMQ_PROTOCOL.length() + 3,
- "User information not found on url", fullURL);
+ "User information not found on url", fullURL);
}
else
{
@@ -161,7 +167,9 @@ public class AMQConnectionURL implements ConnectionURL
{
if (slash != 0 && fullURL.charAt(slash - 1) == ':')
{
- throw URLHelper.parseError(slash - 2, fullURL.indexOf('?') - slash + 2, "Virtual host looks like a windows path, forward slash not allowed in URL", fullURL);
+ throw URLHelper.parseError(slash - 2, fullURL.indexOf('?') - slash + 2,
+ "Virtual host looks like a windows path, forward slash not allowed in URL",
+ fullURL);
}
else
{
@@ -181,7 +189,7 @@ public class AMQConnectionURL implements ConnectionURL
if (colonIndex == -1)
{
throw URLHelper.parseError(AMQ_PROTOCOL.length() + 3, userinfo.length(),
- "Null password in user information not allowed.", _url);
+ "Null password in user information not allowed.", _url);
}
else
{
@@ -230,6 +238,29 @@ public class AMQConnectionURL implements ConnectionURL
_options.remove(OPTIONS_FAILOVER);
}
+
+ if (_options.containsKey(OPTIONS_DEFAULT_TOPIC_EXCHANGE))
+ {
+ _defaultTopicExchangeName = new AMQShortString(_options.get(OPTIONS_DEFAULT_TOPIC_EXCHANGE));
+ }
+
+
+ if (_options.containsKey(OPTIONS_DEFAULT_QUEUE_EXCHANGE))
+ {
+ _defaultQueueExchangeName = new AMQShortString(_options.get(OPTIONS_DEFAULT_QUEUE_EXCHANGE));
+ }
+
+
+ if (_options.containsKey(OPTIONS_TEMPORARY_QUEUE_EXCHANGE))
+ {
+ _temporaryQueueExchangeName = new AMQShortString(_options.get(OPTIONS_TEMPORARY_QUEUE_EXCHANGE));
+ }
+
+
+ if (_options.containsKey(OPTIONS_TEMPORARY_TOPIC_EXCHANGE))
+ {
+ _temporaryTopicExchangeName = new AMQShortString(_options.get(OPTIONS_TEMPORARY_TOPIC_EXCHANGE));
+ }
}
public String getURL()
@@ -332,6 +363,26 @@ public class AMQConnectionURL implements ConnectionURL
_options.put(key, value);
}
+ public AMQShortString getDefaultQueueExchangeName()
+ {
+ return _defaultQueueExchangeName;
+ }
+
+ public AMQShortString getDefaultTopicExchangeName()
+ {
+ return _defaultTopicExchangeName;
+ }
+
+ public AMQShortString getTemporaryQueueExchangeName()
+ {
+ return _temporaryQueueExchangeName;
+ }
+
+ public AMQShortString getTemporaryTopicExchangeName()
+ {
+ return _temporaryTopicExchangeName;
+ }
+
public String toString()
{
StringBuffer sb = new StringBuffer();
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
index a994dbc670..661372845a 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
@@ -145,12 +145,12 @@ public abstract class AMQDestination implements Destination, Referenceable
public boolean isTopic()
{
- return ExchangeDefaults.TOPIC_EXCHANGE_NAME.equals(_exchangeName);
+ return ExchangeDefaults.TOPIC_EXCHANGE_CLASS.equals(_exchangeClass);
}
public boolean isQueue()
{
- return ExchangeDefaults.DIRECT_EXCHANGE_NAME.equals(_exchangeName);
+ return ExchangeDefaults.DIRECT_EXCHANGE_CLASS.equals(_exchangeClass);
}
public AMQShortString getDestinationName()
@@ -411,11 +411,11 @@ public abstract class AMQDestination implements Destination, Referenceable
if (exchangeClass.equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS))
{
- return new AMQQueue(destinationName,queueName,isExclusive,isAutoDelete,isDurable);
+ return new AMQQueue(exchangeName,destinationName,queueName,isExclusive,isAutoDelete,isDurable);
}
else if (exchangeClass.equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS))
{
- return new AMQTopic(destinationName,isAutoDelete,queueName,isDurable);
+ return new AMQTopic(exchangeName,destinationName,isAutoDelete,queueName,isDurable);
}
else if (exchangeClass.equals(ExchangeDefaults.HEADERS_EXCHANGE_CLASS))
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java b/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java
index 44328e3555..9185bc87e8 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java
@@ -21,6 +21,7 @@
package org.apache.qpid.client;
import javax.jms.Queue;
+import javax.jms.Connection;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
@@ -43,9 +44,19 @@ public class AMQQueue extends AMQDestination implements Queue
* Create a reference to a non temporary queue. Note this does not actually imply the queue exists.
* @param name the name of the queue
*/
- public AMQQueue(AMQShortString name)
+ public AMQQueue(AMQShortString exchangeName, String name)
{
- this(name, false);
+ this(exchangeName, new AMQShortString(name));
+ }
+
+
+ /**
+ * Create a reference to a non temporary queue. Note this does not actually imply the queue exists.
+ * @param name the name of the queue
+ */
+ public AMQQueue(AMQShortString exchangeName, AMQShortString name)
+ {
+ this(exchangeName, name, false);
}
public AMQQueue(AMQShortString exchangeName, AMQShortString routingKey, AMQShortString queueName)
@@ -58,9 +69,20 @@ public class AMQQueue extends AMQDestination implements Queue
* Create a reference to a non temporary queue. Note this does not actually imply the queue exists.
* @param name the name of the queue
*/
- public AMQQueue(String name)
+ public AMQQueue(String exchangeName, String name)
{
- this(new AMQShortString(name), false);
+ this(new AMQShortString(exchangeName), new AMQShortString(name), false);
+ }
+
+
+ public AMQQueue(AMQConnection connection, String name)
+ {
+ this(connection.getDefaultQueueExchangeName(),name);
+ }
+
+ public AMQQueue(AMQConnection connection, String name, boolean temporary)
+ {
+ this(connection.getDefaultQueueExchangeName(), new AMQShortString(name),temporary);
}
@@ -71,9 +93,9 @@ public class AMQQueue extends AMQDestination implements Queue
* @param temporary if true the broker will generate a queue name, also if true then the queue is autodeleted
* and exclusive
*/
- public AMQQueue(String name, boolean temporary)
+ public AMQQueue(String exchangeName, String name, boolean temporary)
{
- this(new AMQShortString(name),temporary);
+ this(new AMQShortString(exchangeName), new AMQShortString(name),temporary);
}
@@ -84,11 +106,11 @@ public class AMQQueue extends AMQDestination implements Queue
* @param temporary if true the broker will generate a queue name, also if true then the queue is autodeleted
* and exclusive
*/
- public AMQQueue(AMQShortString name, boolean temporary)
+ public AMQQueue(AMQShortString exchangeName, AMQShortString name, boolean temporary)
{
// queue name is set to null indicating that the broker assigns a name in the case of temporary queues
// temporary queues are typically used as response queues
- this(name, temporary?null:name, temporary, temporary, !temporary);
+ this(exchangeName, name, temporary?null:name, temporary, temporary, !temporary);
}
@@ -99,19 +121,20 @@ public class AMQQueue extends AMQDestination implements Queue
* @param exclusive true if the queue should only permit a single consumer
* @param autoDelete true if the queue should be deleted automatically when the last consumers detaches
*/
- public AMQQueue(AMQShortString destinationName, AMQShortString queueName, boolean exclusive, boolean autoDelete)
+ public AMQQueue(AMQShortString exchangeName, AMQShortString destinationName, AMQShortString queueName, boolean exclusive, boolean autoDelete)
{
- this(destinationName, queueName, exclusive, autoDelete, false);
+ this(exchangeName, destinationName, queueName, exclusive, autoDelete, false);
}
- public AMQQueue(AMQShortString destinationName, AMQShortString queueName, boolean exclusive, boolean autoDelete, boolean durable)
+ public AMQQueue(AMQShortString exchangeName, AMQShortString destinationName, AMQShortString queueName, boolean exclusive, boolean autoDelete, boolean durable)
{
- super(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, destinationName, exclusive,
+ super(exchangeName, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, destinationName, exclusive,
autoDelete, queueName, durable);
}
-
+
+
public AMQShortString getRoutingKey()
{
return getAMQQueueName();
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 3973b5dd71..7ab26f3b47 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -1312,7 +1312,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
checkNotClosed();
if (queueName.indexOf('/') == -1)
{
- return new AMQQueue(queueName);
+ return new AMQQueue(getDefaultQueueExchangeName(), new AMQShortString(queueName));
}
else
{
@@ -1330,6 +1330,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
+ public AMQShortString getDefaultQueueExchangeName()
+ {
+ return _connection.getDefaultQueueExchangeName();
+ }
+
/**
* Creates a QueueReceiver wrapping a MessageConsumer
*
@@ -1379,7 +1384,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
if (topicName.indexOf('/') == -1)
{
- return new AMQTopic(new AMQShortString(topicName));
+ return new AMQTopic(getDefaultTopicExchangeName(),new AMQShortString(topicName));
}
else
{
@@ -1397,6 +1402,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
+ public AMQShortString getDefaultTopicExchangeName()
+ {
+ return _connection.getDefaultTopicExchangeName();
+ }
+
/**
* Creates a non-durable subscriber
*
@@ -1409,8 +1419,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public TopicSubscriber createSubscriber(Topic topic) throws JMSException
{
checkNotClosed();
- checkValidTopic(topic);
- AMQTopic dest = new AMQTopic(topic.getTopicName());
+ AMQTopic dest = checkValidTopic(topic);
+ //AMQTopic dest = new AMQTopic(topic.getTopicName());
return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
}
@@ -1428,16 +1438,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException
{
checkNotClosed();
- checkValidTopic(topic);
- AMQTopic dest = new AMQTopic(topic.getTopicName());
+ AMQTopic dest = checkValidTopic(topic);
+ //AMQTopic dest = new AMQTopic(topic.getTopicName());
return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal));
}
public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
{
checkNotClosed();
- checkValidTopic(topic);
- AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection);
+ AMQTopic origTopic = checkValidTopic(topic);
+ AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection);
TopicSubscriberAdaptor subscriber = _subscriptions.get(name);
if (subscriber != null)
{
@@ -1464,8 +1474,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
// if the queue is bound to the exchange but NOT for this topic, then the JMS spec
// says we must trash the subscription.
- if (isQueueBound(dest.getAMQQueueName()) &&
- !isQueueBound(dest.getAMQQueueName(), topicName))
+ if (isQueueBound(dest.getExchangeName(),dest.getAMQQueueName()) &&
+ !isQueueBound(dest.getExchangeName(),dest.getAMQQueueName(), topicName))
{
deleteQueue(dest.getAMQQueueName());
}
@@ -1556,7 +1566,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
else
{
- if (isQueueBound(AMQTopic.getDurableTopicQueueName(name, _connection)))
+ if (isQueueBound(getDefaultTopicExchangeName(), AMQTopic.getDurableTopicQueueName(name, _connection)))
{
deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection));
}
@@ -1567,17 +1577,17 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
- boolean isQueueBound(AMQShortString queueName) throws JMSException
+ boolean isQueueBound(AMQShortString exchangeName, AMQShortString queueName) throws JMSException
{
- return isQueueBound(queueName, null);
+ return isQueueBound(exchangeName, queueName, null);
}
- boolean isQueueBound(AMQShortString queueName, AMQShortString routingKey) throws JMSException
+ boolean isQueueBound(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey) throws JMSException
{
// TODO: Be aware of possible changes to parameter order as versions change.
AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId,
getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
- ExchangeDefaults.TOPIC_EXCHANGE_NAME, // exchange
+ exchangeName, // exchange
queueName, // queue
routingKey); // routingKey
AMQMethodEvent response = null;
@@ -1858,7 +1868,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
/*
* I could have combined the last 3 methods, but this way it improves readability
*/
- private void checkValidTopic(Topic topic) throws JMSException
+ private AMQTopic checkValidTopic(Topic topic) throws JMSException
{
if (topic == null)
{
@@ -1866,8 +1876,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
if ((topic instanceof TemporaryDestination) && ((TemporaryDestination) topic).getSession() != this)
{
- throw new JMSException("Cannot create a subscription on a temporary topic created in another session");
+ throw new javax.jms.InvalidDestinationException("Cannot create a subscription on a temporary topic created in another session");
}
+ if(!(topic instanceof AMQTopic))
+ {
+ throw new javax.jms.InvalidDestinationException("Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: " + topic.getClass().getName());
+ }
+ return (AMQTopic) topic;
}
private void checkValidQueue(Queue queue) throws InvalidDestinationException
@@ -1887,6 +1902,18 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
+ public AMQShortString getTemporaryTopicExchangeName()
+ {
+ return _connection.getTemporaryTopicExchangeName();
+ }
+
+ public AMQShortString getTemporaryQueueExchangeName()
+ {
+ return _connection.getTemporaryQueueExchangeName();
+ }
+
+
+
public int getTicket()
{
return _ticket;
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java b/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java
index c350eb0c45..ce8e14506f 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java
@@ -40,7 +40,7 @@ final class AMQTemporaryQueue extends AMQQueue implements TemporaryQueue, Tempor
*/
public AMQTemporaryQueue(AMQSession session)
{
- super(new AMQShortString("TempQueue" + Long.toString(System.currentTimeMillis())), true);
+ super(session.getTemporaryQueueExchangeName(),new AMQShortString("TempQueue" + Long.toString(System.currentTimeMillis())), true);
_session = session;
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java b/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java
index 241a9abc9b..6c954ec3df 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.client;
+import org.apache.qpid.framing.AMQShortString;
+
import javax.jms.JMSException;
import javax.jms.TemporaryTopic;
@@ -36,7 +38,7 @@ class AMQTemporaryTopic extends AMQTopic implements TemporaryTopic, TemporaryDes
*/
public AMQTemporaryTopic(AMQSession session)
{
- super("TempQueue" + Long.toString(System.currentTimeMillis()));
+ super(session.getTemporaryTopicExchangeName(),new AMQShortString("TempQueue" + Long.toString(System.currentTimeMillis())));
_session = session;
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
index 7d84ec6470..319e728edf 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
@@ -39,32 +39,43 @@ public class AMQTopic extends AMQDestination implements Topic
super(binding);
}
- public AMQTopic(String name)
- {
- this(new AMQShortString(name));
- }
+// public AMQTopic(String exchangeName, String routingKey)
+// {
+// this(new AMQShortString(exchangeName), new AMQShortString(routingKey));
+// }
public AMQTopic(AMQShortString exchange, AMQShortString routingKey, AMQShortString queueName)
{
super(exchange, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, routingKey, true, true, queueName, false);
}
+ public AMQTopic(AMQConnection conn, String routingKey)
+ {
+ this(conn.getDefaultTopicExchangeName(), new AMQShortString(routingKey));
+ }
+
+
+ public AMQTopic(AMQShortString exchangeName, String routingKey)
+ {
+ this(exchangeName, new AMQShortString(routingKey));
+ }
- public AMQTopic(AMQShortString name)
+ public AMQTopic(AMQShortString exchangeName, AMQShortString routingKey)
{
- this(name, true, null, false);
+ this(exchangeName, routingKey, null);
}
- public AMQTopic(AMQShortString name, boolean isAutoDelete, AMQShortString queueName, boolean isDurable)
+ public AMQTopic(AMQShortString exchangeName, AMQShortString name, boolean isAutoDelete, AMQShortString queueName, boolean isDurable)
{
- super(ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, name, true, isAutoDelete,
+ super(exchangeName, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, name, true, isAutoDelete,
queueName, isDurable);
}
public static AMQTopic createDurableTopic(AMQTopic topic, String subscriptionName, AMQConnection connection)
throws JMSException
{
- return new AMQTopic(topic.getDestinationName(), false, getDurableTopicQueueName(subscriptionName, connection),
+ return new AMQTopic(topic.getExchangeName(), topic.getDestinationName(), false,
+ getDurableTopicQueueName(subscriptionName, connection),
true);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
index 3f8c1f65f8..19382b58c3 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
@@ -380,4 +380,9 @@ public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessag
}
}
+ public String toString()
+ {
+ return String.valueOf(System.identityHashCode(this));
+ }
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java b/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
index caadb0f621..2d91e290c4 100644
--- a/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
+++ b/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.jms;
+import org.apache.qpid.framing.AMQShortString;
+
import java.util.List;
/**
@@ -35,6 +37,10 @@ public interface ConnectionURL
public static final String OPTIONS_FAILOVER = "failover";
public static final String OPTIONS_FAILOVER_CYCLE = "cyclecount";
public static final String OPTIONS_SSL = "ssl";
+ public static final String OPTIONS_DEFAULT_TOPIC_EXCHANGE = "defaultTopicExchange";
+ public static final String OPTIONS_DEFAULT_QUEUE_EXCHANGE = "defaultQueueExchange";
+ public static final String OPTIONS_TEMPORARY_TOPIC_EXCHANGE = "temporaryTopicExchange";
+ public static final String OPTIONS_TEMPORARY_QUEUE_EXCHANGE = "temporaryQueueExchange";
String getURL();
@@ -69,4 +75,12 @@ public interface ConnectionURL
String getOption(String key);
void setOption(String key, String value);
+
+ AMQShortString getDefaultQueueExchangeName();
+
+ AMQShortString getDefaultTopicExchangeName();
+
+ AMQShortString getTemporaryQueueExchangeName();
+
+ AMQShortString getTemporaryTopicExchangeName();
}
diff --git a/java/client/src/main/java/org/apache/qpid/jms/Session.java b/java/client/src/main/java/org/apache/qpid/jms/Session.java
index 025aef66c8..5287381fae 100644
--- a/java/client/src/main/java/org/apache/qpid/jms/Session.java
+++ b/java/client/src/main/java/org/apache/qpid/jms/Session.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.jms;
+import org.apache.qpid.framing.AMQShortString;
+
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
@@ -88,4 +90,12 @@ public interface Session extends javax.jms.Session
*/
MessageProducer createProducer(Destination destination, boolean immediate)
throws JMSException;
+
+ AMQShortString getTemporaryTopicExchangeName();
+
+ AMQShortString getDefaultQueueExchangeName();
+
+ AMQShortString getDefaultTopicExchangeName();
+
+ AMQShortString getTemporaryQueueExchangeName();
}
diff --git a/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java b/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
index 7a76aa0002..4f6f1561b6 100644
--- a/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
+++ b/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
@@ -47,6 +47,7 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.BindingURL;
import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.exchange.ExchangeDefaults;
public class PropertiesFileInitialContextFactory implements InitialContextFactory
{
@@ -236,12 +237,12 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor
{
if(value instanceof AMQShortString)
{
- return new AMQQueue((AMQShortString) value);
+ return new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, (AMQShortString) value);
}
else if (value instanceof String)
{
- return new AMQQueue(new AMQShortString((String) value));
+ return new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, new AMQShortString((String) value));
}
else if (value instanceof BindingURL)
@@ -259,11 +260,11 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor
{
if(value instanceof AMQShortString)
{
- return new AMQTopic((AMQShortString)value);
+ return new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, (AMQShortString)value);
}
else if (value instanceof String)
{
- return new AMQTopic(new AMQShortString((String) value));
+ return new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, new AMQShortString((String) value));
}
else if (value instanceof BindingURL)
diff --git a/java/client/src/old_test/java/org/apache/qpid/cluster/Client.java b/java/client/src/old_test/java/org/apache/qpid/cluster/Client.java
index 7a413eee3d..cf8059a143 100644
--- a/java/client/src/old_test/java/org/apache/qpid/cluster/Client.java
+++ b/java/client/src/old_test/java/org/apache/qpid/cluster/Client.java
@@ -25,6 +25,8 @@ import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.url.URLSyntaxException;
import javax.jms.MessageListener;
@@ -48,8 +50,8 @@ public class Client
this.name = name;
session = connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
- AMQTopic topic = new AMQTopic("cluster_test_topic");
- AMQQueue queue = new AMQQueue("cluster_test_queue");
+ AMQTopic topic = new AMQTopic(((AMQSession)session).getDefaultTopicExchangeName(), new AMQShortString("cluster_test_topic"));
+ AMQQueue queue = new AMQQueue(((AMQSession)session).getDefaultQueueExchangeName(), new AMQShortString("cluster_test_queue"));
topicProducer = session.createProducer(topic);
queueProducer = session.createProducer(queue);
diff --git a/java/client/src/old_test/java/org/apache/qpid/flow/ChannelFlowTest.java b/java/client/src/old_test/java/org/apache/qpid/flow/ChannelFlowTest.java
index 0c57a73d5d..aba2d5d657 100644
--- a/java/client/src/old_test/java/org/apache/qpid/flow/ChannelFlowTest.java
+++ b/java/client/src/old_test/java/org/apache/qpid/flow/ChannelFlowTest.java
@@ -24,6 +24,8 @@ import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
import javax.jms.Message;
import javax.jms.MessageListener;
@@ -41,7 +43,7 @@ public class ChannelFlowTest implements MessageListener
ChannelFlowTest(AMQConnection connection) throws Exception
{
- this(connection, new AMQQueue(randomize("ChannelFlowTest"), true));
+ this(connection, new AMQQueue(connection.getDefaultQueueExchangeName(), new AMQShortString(randomize("ChannelFlowTest")), true));
}
ChannelFlowTest(AMQConnection connection, AMQDestination destination) throws Exception
diff --git a/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargePublisher.java b/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargePublisher.java
index 983186a545..a246352d8b 100644
--- a/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargePublisher.java
+++ b/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargePublisher.java
@@ -21,9 +21,12 @@
package org.apache.qpid.fragmentation;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.url.URLSyntaxException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.AMQSession;
import org.apache.qpid.jms.MessageProducer;
import org.apache.qpid.jms.Session;
import org.apache.log4j.Logger;
@@ -49,7 +52,7 @@ public class TestLargePublisher
private AMQConnection _connection;
- private Session _session;
+ private AMQSession _session;
private class CallbackHandler implements MessageListener
{
@@ -109,8 +112,8 @@ public class TestLargePublisher
{
createConnection(host, port, clientID);
- _session = (Session) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- AMQTopic destination = new AMQTopic("large");
+ _session = (AMQSession) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ AMQTopic destination = new AMQTopic(_session.getDefaultTopicExchangeName(), new AMQShortString("large"));
MessageProducer producer = (MessageProducer) _session.createProducer(destination);
_connection.start();
diff --git a/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargeSubscriber.java b/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargeSubscriber.java
index 03ace4a8d9..b0cde22349 100644
--- a/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargeSubscriber.java
+++ b/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargeSubscriber.java
@@ -22,7 +22,10 @@ package org.apache.qpid.fragmentation;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.AMQSession;
import org.apache.qpid.jms.Session;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.log4j.Logger;
import javax.jms.*;
@@ -76,11 +79,12 @@ public class TestLargeSubscriber
InetAddress address = InetAddress.getLocalHost();
AMQConnection con = new AMQConnection(host, port, username, password,
address.getHostName(), virtualPath);
- final Session session = (Session) con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final AMQSession session = (AMQSession) con.createSession(false, Session.AUTO_ACKNOWLEDGE);
final int expectedMessageCount = numExpectedMessages;
- MessageConsumer consumer = session.createConsumer(new AMQTopic("large"),
+ MessageConsumer consumer = session.createConsumer(new AMQTopic(session.getDefaultTopicExchangeName(),
+ new AMQShortString("large")),
100, true, false, null);
consumer.setMessageListener(new MessageListener()
diff --git a/java/client/src/old_test/java/org/apache/qpid/latency/LatencyTest.java b/java/client/src/old_test/java/org/apache/qpid/latency/LatencyTest.java
index ebc9488f68..8d833f4d4c 100644
--- a/java/client/src/old_test/java/org/apache/qpid/latency/LatencyTest.java
+++ b/java/client/src/old_test/java/org/apache/qpid/latency/LatencyTest.java
@@ -24,6 +24,8 @@ import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
import javax.jms.MessageProducer;
import javax.jms.Message;
@@ -51,7 +53,7 @@ public class LatencyTest implements MessageListener
LatencyTest(AMQConnection connection, int count, int delay, int length) throws Exception
{
- this(connection, new AMQQueue(randomize("LatencyTest"), true), count, delay, length);
+ this(connection, new AMQQueue(connection.getDefaultQueueExchangeName(), new AMQShortString(randomize("LatencyTest")), true), count, delay, length);
}
LatencyTest(AMQConnection connection, AMQDestination destination, int count, int delay, int length) throws Exception
diff --git a/java/client/src/old_test/java/org/apache/qpid/multiconsumer/AMQTest.java b/java/client/src/old_test/java/org/apache/qpid/multiconsumer/AMQTest.java
index 10e03d3522..db02b9954a 100644
--- a/java/client/src/old_test/java/org/apache/qpid/multiconsumer/AMQTest.java
+++ b/java/client/src/old_test/java/org/apache/qpid/multiconsumer/AMQTest.java
@@ -31,7 +31,6 @@ import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
-import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
@@ -41,6 +40,9 @@ import org.apache.commons.codec.binary.Base64;
import org.apache.qpid.client.AMQConnectionFactory;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.jms.Session;
/**
* Test AMQ.
@@ -54,7 +56,7 @@ public class AMQTest extends TestCase implements ExceptionListener
private static final String DUMMYCONTENT = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
private static final String HUGECONTENT;
- private Connection connect = null;
+ private AMQConnection connect = null;
private Session pubSession = null;
private Session subSession = null;
private Topic topic = null;
@@ -75,7 +77,7 @@ public class AMQTest extends TestCase implements ExceptionListener
connect.setExceptionListener(this);
pubSession = connect.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
subSession = connect.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
- topic = new AMQTopic(SUBJECT);
+ topic = new AMQTopic(pubSession.getDefaultTopicExchangeName(), new AMQShortString(SUBJECT));
connect.start();
}
diff --git a/java/client/src/old_test/java/org/apache/qpid/pubsub1/TestPublisher.java b/java/client/src/old_test/java/org/apache/qpid/pubsub1/TestPublisher.java
index 45b241975d..33891142b5 100644
--- a/java/client/src/old_test/java/org/apache/qpid/pubsub1/TestPublisher.java
+++ b/java/client/src/old_test/java/org/apache/qpid/pubsub1/TestPublisher.java
@@ -23,6 +23,8 @@ package org.apache.qpid.pubsub1;
import org.apache.log4j.Logger;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.url.URLSyntaxException;
import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.jms.MessageProducer;
@@ -110,8 +112,8 @@ public class TestPublisher
{
createConnection(host, port, clientID);
- _session = (Session) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- AMQTopic destination = new AMQTopic(commandQueueName);
+ _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ AMQTopic destination = new AMQTopic(_session.getDefaultTopicExchangeName(), new AMQShortString(commandQueueName));
MessageProducer producer = (MessageProducer) _session.createProducer(destination);
_connection.start();
diff --git a/java/client/src/old_test/java/org/apache/qpid/pubsub1/TestSubscriber.java b/java/client/src/old_test/java/org/apache/qpid/pubsub1/TestSubscriber.java
index 14cf206f50..450d9b3914 100644
--- a/java/client/src/old_test/java/org/apache/qpid/pubsub1/TestSubscriber.java
+++ b/java/client/src/old_test/java/org/apache/qpid/pubsub1/TestSubscriber.java
@@ -24,6 +24,8 @@ import org.apache.log4j.Logger;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.jms.Session;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
@@ -87,17 +89,17 @@ public class TestSubscriber
InetAddress address = InetAddress.getLocalHost();
AMQConnection con1 = new AMQConnection(args[0], Integer.parseInt(args[1]), args[2], args[3],
address.getHostName(), args[4]);
- final org.apache.qpid.jms.Session session1 = (org.apache.qpid.jms.Session) con1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final Session session1 = con1.createSession(false, Session.AUTO_ACKNOWLEDGE);
AMQConnection con2 = new AMQConnection(args[0], Integer.parseInt(args[1]), args[2], args[3],
address.getHostName(), args[4]);
- final org.apache.qpid.jms.Session session2 = (org.apache.qpid.jms.Session) con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
String selector = args[6];
final int expectedMessageCount = Integer.parseInt(args[5]);
_logger.info("Message selector is <" + selector + ">...");
- Topic t = new AMQTopic("cbr");
+ Topic t = new AMQTopic(session1.getDefaultTopicExchangeName(), new AMQShortString("cbr"));
MessageConsumer consumer1 = session1.createConsumer(t,
100, false, false, selector);
MessageConsumer consumer2 = session2.createConsumer(t,
diff --git a/java/client/src/old_test/java/org/apache/qpid/topic/MessageFactory.java b/java/client/src/old_test/java/org/apache/qpid/topic/MessageFactory.java
index 1520f18408..39d64069d1 100644
--- a/java/client/src/old_test/java/org/apache/qpid/topic/MessageFactory.java
+++ b/java/client/src/old_test/java/org/apache/qpid/topic/MessageFactory.java
@@ -22,6 +22,8 @@ package org.apache.qpid.topic;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
import javax.jms.*;
@@ -47,8 +49,8 @@ class MessageFactory
_session = session;
if(session instanceof AMQSession)
{
- _topic = new AMQTopic("topictest.messages");
- _control = new AMQTopic("topictest.control");
+ _topic = new AMQTopic(((AMQSession)session).getDefaultTopicExchangeName(),new AMQShortString("topictest.messages"));
+ _control = new AMQTopic(((AMQSession)session).getDefaultTopicExchangeName(),new AMQShortString("topictest.control"));
}
else
{
diff --git a/java/client/src/old_test/java/org/apache/qpid/transacted/Ping.java b/java/client/src/old_test/java/org/apache/qpid/transacted/Ping.java
index e0af4422a6..8f15bf089e 100644
--- a/java/client/src/old_test/java/org/apache/qpid/transacted/Ping.java
+++ b/java/client/src/old_test/java/org/apache/qpid/transacted/Ping.java
@@ -22,6 +22,8 @@ package org.apache.qpid.transacted;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.client.AMQQueue;
import javax.jms.Connection;
@@ -35,7 +37,7 @@ public class Ping
Config config = new Config(argv);
Connection con = config.createConnection();
con.setClientID("ping");
- new Relay(new AMQQueue("ping"), new AMQQueue("pong"), con,
+ new Relay(new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, new AMQShortString("ping")), new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, new AMQShortString("pong")), con,
config.isEchoOn(),
config.getBatchSize(),
config.usePersistentMessages()).start();
diff --git a/java/client/src/old_test/java/org/apache/qpid/transacted/Pong.java b/java/client/src/old_test/java/org/apache/qpid/transacted/Pong.java
index 13295c137a..f4f4b20d7c 100644
--- a/java/client/src/old_test/java/org/apache/qpid/transacted/Pong.java
+++ b/java/client/src/old_test/java/org/apache/qpid/transacted/Pong.java
@@ -22,6 +22,8 @@ package org.apache.qpid.transacted;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.client.AMQQueue;
import javax.jms.Connection;
@@ -34,7 +36,7 @@ public class Pong
Config config = new Config(argv);
Connection con = config.createConnection();
con.setClientID("pong");
- new Relay(new AMQQueue("pong"), new AMQQueue("ping"), con,
+ new Relay(new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, new AMQShortString("pong")), new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, new AMQShortString("ping")), con,
config.isEchoOn(),
config.getBatchSize(),
config.usePersistentMessages()).start();
diff --git a/java/client/src/old_test/java/org/apache/qpid/transacted/Start.java b/java/client/src/old_test/java/org/apache/qpid/transacted/Start.java
index 5564ed93ab..de718d828a 100644
--- a/java/client/src/old_test/java/org/apache/qpid/transacted/Start.java
+++ b/java/client/src/old_test/java/org/apache/qpid/transacted/Start.java
@@ -22,6 +22,8 @@ package org.apache.qpid.transacted;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.client.AMQQueue;
import javax.jms.Connection;
@@ -33,7 +35,7 @@ public class Start
public static void main(String[] argv) throws Exception
{
Connection con = new Config(argv).createConnection();
- AMQQueue ping = new AMQQueue("ping");
+ AMQQueue ping = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, new AMQShortString("ping"));
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createProducer(ping).send(session.createTextMessage("start"));
session.close();
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
index 266e01b66a..338404a431 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
@@ -28,7 +28,7 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
-import javax.jms.Session;
+
import javax.jms.TextMessage;
import junit.framework.TestCase;
@@ -40,6 +40,7 @@ import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.jms.Session;
public class RecoverTest extends TestCase
{
@@ -66,15 +67,15 @@ public class RecoverTest extends TestCase
public void testRecoverResendsMsgs() throws Exception
{
- Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
+ AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- Queue queue = new AMQQueue(new AMQShortString("someQ"), new AMQShortString("someQ"), false, true);
+ Queue queue = new AMQQueue(consumerSession.getDefaultQueueExchangeName(),new AMQShortString("someQ"), new AMQShortString("someQ"), false, true);
MessageConsumer consumer = consumerSession.createConsumer(queue);
//force synch to ensure the consumer has resulted in a bound queue
((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
- Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
+ AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageProducer producer = producerSession.createProducer(queue);
@@ -123,15 +124,15 @@ public class RecoverTest extends TestCase
public void testRecoverResendsMsgsAckOnEarlier() throws Exception
{
- Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
+ AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- Queue queue = new AMQQueue(new AMQShortString("someQ"), new AMQShortString("someQ"), false, true);
+ Queue queue = new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("someQ"), new AMQShortString("someQ"), false, true);
MessageConsumer consumer = consumerSession.createConsumer(queue);
//force synch to ensure the consumer has resulted in a bound queue
((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
- Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
+ AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageProducer producer = producerSession.createProducer(queue);
@@ -187,15 +188,15 @@ public class RecoverTest extends TestCase
public void testAcknowledgePerConsumer() throws Exception
{
- Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
+ AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- Queue queue = new AMQQueue(new AMQShortString("Q1"), new AMQShortString("Q1"), false, true);
- Queue queue2 = new AMQQueue(new AMQShortString("Q2"), new AMQShortString("Q2"), false, true);
+ Queue queue = new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q1"), new AMQShortString("Q1"), false, true);
+ Queue queue2 = new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q2"), new AMQShortString("Q2"), false, true);
MessageConsumer consumer = consumerSession.createConsumer(queue);
MessageConsumer consumer2 = consumerSession.createConsumer(queue2);
- Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
+ AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageProducer producer = producerSession.createProducer(queue);
MessageProducer producer2 = producerSession.createProducer(queue2);
@@ -226,10 +227,10 @@ public class RecoverTest extends TestCase
public void testRecoverInAutoAckListener() throws Exception
{
- Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
+ AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
final Session consumerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue queue = new AMQQueue(new AMQShortString("Q3"), new AMQShortString("Q3"), false, true);
+ Queue queue = new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q3"), new AMQShortString("Q3"), false, true);
MessageConsumer consumer = consumerSession.createConsumer(queue);
MessageProducer producer = consumerSession.createProducer(queue);
producer.send(consumerSession.createTextMessage("hello"));
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java
index a6ae69c4de..0f336998f0 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java
@@ -68,7 +68,7 @@ public class BytesMessageTest extends TestCase implements MessageListener
void init(AMQConnection connection) throws Exception
{
- init(connection, new AMQQueue(randomize("BytesMessageTest"), true));
+ init(connection, new AMQQueue(connection, randomize("BytesMessageTest"), true));
}
void init(AMQConnection connection, AMQDestination destination) throws Exception
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java
index a957c651b2..1b32b73dbe 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java
@@ -70,7 +70,7 @@ public class FieldTableMessageTest extends TestCase implements MessageListener
private void init(AMQConnection connection) throws Exception
{
- init(connection, new AMQQueue(randomize("FieldTableMessageTest"), true));
+ init(connection, new AMQQueue(connection,randomize("FieldTableMessageTest"), true));
}
private void init(AMQConnection connection, AMQDestination destination) throws Exception
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/LargeMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/LargeMessageTest.java
index 0cca3e4659..3830d61701 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/LargeMessageTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/LargeMessageTest.java
@@ -68,7 +68,7 @@ public class LargeMessageTest extends TestCase
private void init(AMQConnection connection) throws Exception
{
- Destination destination = new AMQQueue("LargeMessageTest", true);
+ Destination destination = new AMQQueue(connection,"LargeMessageTest", true);
init(connection, destination);
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java
index 78a4bd6b49..75eb3a8d5e 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java
@@ -83,7 +83,7 @@ public class MapMessageTest extends TestCase implements MessageListener
private void init(AMQConnection connection) throws Exception
{
- Destination destination = new AMQQueue(randomize("MapMessageTest"), true);
+ Destination destination = new AMQQueue(connection,randomize("MapMessageTest"), true);
init(connection, destination);
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java
index bdd80b43fe..c4e4753c21 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java
@@ -32,6 +32,7 @@ import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.exchange.ExchangeDefaults;
public class MultipleConnectionTest extends TestCase
{
@@ -191,7 +192,7 @@ public class MultipleConnectionTest extends TestCase
String broker = _connectionString;
int messages = 10;
- AMQTopic topic = new AMQTopic("amq.topic");
+ AMQTopic topic = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME,"amq.topic");
Receiver[] receivers = new Receiver[]{
new Receiver(broker, topic, 2),
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java
index e5add8fe08..099433e779 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java
@@ -72,7 +72,7 @@ public class ObjectMessageTest extends TestCase implements MessageListener
private void init(AMQConnection connection) throws Exception
{
- init(connection, new AMQQueue(randomize("ObjectMessageTest"), true));
+ init(connection, new AMQQueue(connection,randomize("ObjectMessageTest"), true));
}
private void init(AMQConnection connection, AMQDestination destination) throws Exception
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
index aceb40f4c7..463cdca17b 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
@@ -76,7 +76,7 @@ public class PropertyValueTest extends TestCase implements MessageListener
private void init(AMQConnection connection) throws Exception
{
- Destination destination = new AMQQueue(randomize("PropertyValueTest"), true);
+ Destination destination = new AMQQueue(connection, randomize("PropertyValueTest"), true);
init(connection, destination);
}
@@ -132,7 +132,7 @@ public class PropertyValueTest extends TestCase implements MessageListener
}
else
{
- q = new AMQQueue("TestReply");
+ q = new AMQQueue(_connection,"TestReply");
}
m.setJMSReplyTo(q);
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java
index b63677fc34..3aefc098aa 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java
@@ -56,8 +56,11 @@ public class PubSubTwoConnectionTest extends TestCase
*/
public void testTwoConnections() throws Exception
{
- Topic topic = new AMQTopic("MyTopic");
- Connection con1 = new AMQConnection("vm://:1", "guest", "guest", "Client1", "test");
+
+ AMQConnection con1 = new AMQConnection("vm://:1", "guest", "guest", "Client1", "test");
+
+ Topic topic = new AMQTopic(con1, "MyTopic");
+
Session session1 = con1.createSession(false, AMQSession.NO_ACKNOWLEDGE);
MessageProducer producer = session1.createProducer(topic);
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/ReceiveTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/ReceiveTest.java
index e6392916c8..668233f356 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/ReceiveTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/ReceiveTest.java
@@ -75,7 +75,7 @@ public class ReceiveTest extends TestCase
private void init(AMQConnection connection) throws Exception
{
- init(connection, new AMQQueue("ReceiveTest", true));
+ init(connection, new AMQQueue(connection,"ReceiveTest", true));
}
private void init(AMQConnection connection, AMQDestination destination) throws Exception
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java
index 70bd50db15..0cdafebb1c 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java
@@ -61,7 +61,7 @@ public class SelectorTest extends TestCase implements MessageListener
private void init(AMQConnection connection) throws Exception
{
- init(connection, new AMQQueue(randomize("SessionStartTest"), true));
+ init(connection, new AMQQueue(connection,randomize("SessionStartTest"), true));
}
private void init(AMQConnection connection, AMQDestination destination) throws Exception
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/SessionStartTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/SessionStartTest.java
index 498a18d1da..6cf64499aa 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/SessionStartTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/SessionStartTest.java
@@ -31,6 +31,7 @@ import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.testutil.VMBrokerSetup;
+import org.apache.qpid.framing.AMQShortString;
public class SessionStartTest extends TestCase implements MessageListener
{
@@ -53,7 +54,7 @@ public class SessionStartTest extends TestCase implements MessageListener
private void init(AMQConnection connection) throws Exception
{
- init(connection, new AMQQueue(randomize("SessionStartTest"), true));
+ init(connection, new AMQQueue(connection.getDefaultQueueExchangeName(),new AMQShortString(randomize("SessionStartTest")), true));
}
private void init(AMQConnection connection, AMQDestination destination) throws Exception
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java
index d0d220c9e5..24c93a0af8 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java
@@ -41,6 +41,7 @@ import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.message.JMSTextMessage;
import org.apache.qpid.testutil.VMBrokerSetup;
+import org.apache.qpid.framing.AMQShortString;
public class TextMessageTest extends TestCase implements MessageListener
{
@@ -74,7 +75,7 @@ public class TextMessageTest extends TestCase implements MessageListener
private void init(AMQConnection connection) throws Exception
{
- Destination destination = new AMQQueue(randomize("TextMessageTest"), true);
+ Destination destination = new AMQQueue(connection.getDefaultQueueExchangeName(), new AMQShortString(randomize("TextMessageTest")), true);
init(connection, destination);
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java
index 92f30e6478..0e15341615 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java
@@ -31,6 +31,7 @@ import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.framing.AMQShortString;
public class AMQConnectionTest extends TestCase
{
@@ -45,8 +46,8 @@ public class AMQConnectionTest extends TestCase
super.setUp();
TransportConnection.createVMBroker(1);
_connection = new AMQConnection("vm://:1", "guest", "guest", "fred", "test");
- _topic = new AMQTopic("mytopic");
- _queue = new AMQQueue("myqueue");
+ _topic = new AMQTopic(_connection.getDefaultTopicExchangeName(), new AMQShortString("mytopic"));
+ _queue = new AMQQueue(_connection.getDefaultQueueExchangeName(), new AMQShortString("myqueue"));
}
protected void tearDown() throws Exception
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQSessionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQSessionTest.java
index 6eff9d6d6a..78b7976f55 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQSessionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQSessionTest.java
@@ -47,8 +47,8 @@ public class AMQSessionTest extends TestCase
{
super.setUp();
_connection = new AMQConnection("vm://:1", "guest", "guest", "fred", "test");
- _topic = new AMQTopic("mytopic");
- _queue = new AMQQueue("myqueue");
+ _topic = new AMQTopic(_connection,"mytopic");
+ _queue = new AMQQueue(_connection,"myqueue");
_session = (AMQSession) _connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java
index 8211c5d8cf..c7f1bb3065 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java
@@ -58,7 +58,7 @@ import org.apache.qpid.client.transport.TransportConnection;
*/
public class ChannelCloseOkTest extends TestCase
{
- private Connection _connection;
+ private AMQConnection _connection;
private Destination _destination1;
private Destination _destination2;
private Session _session1;
@@ -77,8 +77,8 @@ public class ChannelCloseOkTest extends TestCase
TransportConnection.createVMBroker(1);
_connection = new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test");
- _destination1 = new AMQQueue("q1", true);
- _destination2 = new AMQQueue("q2", true);
+ _destination1 = new AMQQueue(_connection,"q1", true);
+ _destination2 = new AMQQueue(_connection, "q2", true);
_session1 = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
_session1.createConsumer(_destination1).setMessageListener(new MessageListener()
{
@@ -164,7 +164,7 @@ public class ChannelCloseOkTest extends TestCase
assertEquals(1, _received2.size());
// Now send message to incorrect destination on session 1.
- Destination destination = new AMQQueue("incorrect");
+ Destination destination = new AMQQueue(_connection, "incorrect");
send(_session1, destination, "third"); // no point waiting as message will never be received.
// Ensure both sessions are still ok.
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java
index 3aa8eaacef..d19a6095d5 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java
@@ -49,10 +49,10 @@ public class CloseWithBlockingReceiveTest extends TestCase
public void testReceiveReturnsNull() throws Exception
{
- final Connection connection = new AMQConnection("vm://:1", "guest", "guest",
+ final AMQConnection connection = new AMQConnection("vm://:1", "guest", "guest",
"fred", "test");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = session.createConsumer(new AMQTopic("banana"));
+ MessageConsumer consumer = session.createConsumer(new AMQTopic(connection, "banana"));
connection.start();
Runnable r = new Runnable()
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java
index b5586709d6..ae8e2cfbda 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java
@@ -54,10 +54,12 @@ public class ConnectionStartTest extends TestCase
try
{
- AMQQueue queue = new AMQQueue("ConnectionStartTest");
+
AMQConnection pubCon = new AMQConnection(_broker, "guest", "guest", "fred", "test");
+ AMQQueue queue = new AMQQueue(pubCon,"ConnectionStartTest");
+
Session pubSess = pubCon.createSession(false, AMQSession.AUTO_ACKNOWLEDGE);
MessageProducer pub = pubSess.createProducer(queue);
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
index 585f52a959..d9ce080e14 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
@@ -20,16 +20,21 @@
*/
package org.apache.qpid.test.unit.client.connection;
-import javax.jms.Connection;
-
-import junit.framework.TestCase;
-
import org.apache.qpid.AMQConnectionFailureException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUnresolvedAddressException;
import org.apache.qpid.client.AMQAuthenticationException;
import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.jms.Session;
+
+import junit.framework.TestCase;
+
+import javax.jms.Connection;
+import javax.jms.QueueSession;
+import javax.jms.TopicSession;
public class ConnectionTest extends TestCase
{
@@ -54,7 +59,7 @@ public class ConnectionTest extends TestCase
{
try
{
- AMQConnection conn = new AMQConnection(_broker, "guest", "guest", "fred", "test");
+ AMQConnection conn = new AMQConnection(_broker, "guest", "guest", "fred", "test");
conn.close();
}
catch (Exception e)
@@ -63,6 +68,54 @@ public class ConnectionTest extends TestCase
}
}
+
+ public void testDefaultExchanges()
+ {
+ try
+ {
+ AMQConnection conn = new AMQConnection("amqp://guest:guestd@clientid/test?brokerlist='"
+ + _broker
+ + "?retries='1''&defaultQueueExchange='test.direct'"
+ + "&defaultTopicExchange='test.topic'"
+ + "&temporaryQueueExchange='tmp.direct'"
+ + "&temporaryTopicExchange='tmp.topic'");
+
+ QueueSession queueSession = conn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ AMQQueue queue = (AMQQueue) queueSession.createQueue("MyQueue");
+
+ assertEquals(queue.getExchangeName().toString(), "test.direct");
+
+ AMQQueue tempQueue = (AMQQueue) queueSession.createTemporaryQueue();
+
+ assertEquals(tempQueue.getExchangeName().toString(), "tmp.direct");
+
+
+ queueSession.close();
+
+
+ TopicSession topicSession = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ AMQTopic topic = (AMQTopic) topicSession.createTopic("silly.topic");
+
+ assertEquals(topic.getExchangeName().toString(), "test.topic");
+
+ AMQTopic tempTopic = (AMQTopic) topicSession.createTemporaryTopic();
+
+ assertEquals(tempTopic.getExchangeName().toString(), "tmp.topic");
+
+ topicSession.close();
+
+
+ conn.close();
+ }
+ catch (Exception e)
+ {
+ fail("Connection to " + _broker + " should succeed. Reason: " + e);
+ }
+ }
+
+
// FIXME The inVM broker currently has no authentication .. Needs added QPID-70
public void passwordFailureConnection() throws Exception
{
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
index 41ab535c6e..bfbba61913 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
@@ -456,6 +456,24 @@ public class ConnectionURLTest extends TestCase
}
}
+
+ public void testDefaultExchanges() throws URLSyntaxException
+ {
+ String url = "amqp://guest:guest@id/test" + "?defaultQueueExchange='test.direct'&defaultTopicExchange='test.topic'&temporaryQueueExchange='tmp.direct'&temporaryTopicExchange='tmp.topic'";
+
+ AMQConnectionURL conn = new AMQConnectionURL(url);
+
+ assertEquals(conn.getDefaultQueueExchangeName(),"test.direct");
+
+ assertEquals(conn.getDefaultTopicExchangeName(),"test.topic");
+
+ assertEquals(conn.getTemporaryQueueExchangeName(),"tmp.direct");
+
+ assertEquals(conn.getTemporaryTopicExchangeName(),"tmp.topic");
+
+ }
+
+
public static junit.framework.Test suite()
{
return new junit.framework.TestSuite(ConnectionURLTest.class);
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java
index 92a0d4a8ee..b9394b87a1 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java
@@ -131,7 +131,7 @@ public class DestinationURLTest extends TestCase
AMQBindingURL dest = new AMQBindingURL(url);
assertTrue(dest.getExchangeClass().equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS));
- assertTrue(dest.getExchangeName().equals(ExchangeDefaults.DIRECT_EXCHANGE_NAME));
+ assertTrue(dest.getExchangeName().equals(ExchangeDefaults.DEFAULT_EXCHANGE_NAME));
assertTrue(dest.getDestinationName().equals(""));
assertTrue(dest.getQueueName().equals("IBMPerfQueue1"));
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java
index 47be69c826..2e63bf4739 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java
@@ -26,6 +26,7 @@ import javax.jms.MessageListener;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.framing.AMQShortString;
/**
* Declare a private temporary response queue,
@@ -50,10 +51,10 @@ public class Client implements MessageListener
_connection = connection;
_expected = expected;
_session = (AMQSession) _connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
- AMQQueue response = new AMQQueue("ResponseQueue", true);
+ AMQQueue response = new AMQQueue(_connection.getDefaultQueueExchangeName(), new AMQShortString("ResponseQueue"), true);
_session.createConsumer(response).setMessageListener(this);
_connection.start();
- AMQQueue service = new SpecialQueue("ServiceQueue");
+ AMQQueue service = new SpecialQueue(_connection,"ServiceQueue");
Message request = _session.createTextMessage("Request!");
request.setJMSReplyTo(response);
_session.createProducer(service).send(request);
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Service.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Service.java
index 6771453977..6593f7d86a 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Service.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Service.java
@@ -46,7 +46,7 @@ public class Service implements MessageListener
Service(AMQConnection connection) throws Exception
{
_connection = connection;
- AMQQueue queue = new SpecialQueue("ServiceQueue");
+ AMQQueue queue = new SpecialQueue(connection, "ServiceQueue");
_session = (AMQSession) _connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
_session.createConsumer(queue).setMessageListener(this);
_connection.start();
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/SpecialQueue.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/SpecialQueue.java
index 691acbb213..27371b0397 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/SpecialQueue.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/SpecialQueue.java
@@ -21,6 +21,7 @@
package org.apache.qpid.test.unit.client.forwardall;
import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.framing.AMQShortString;
/**
@@ -32,14 +33,10 @@ class SpecialQueue extends AMQQueue
{
private final AMQShortString name;
- SpecialQueue(String name)
+ SpecialQueue(AMQConnection con, String name)
{
- this(new AMQShortString(name));
- }
- SpecialQueue(AMQShortString name)
- {
- super(name, true);
- this.name = name;
+ super(con, name, true);
+ this.name = new AMQShortString(name);
}
public AMQShortString getRoutingKey()
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java
index 5b9fb2549e..6d131cd52e 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java
@@ -56,7 +56,7 @@ public class ObjectMessageTest extends TestCase implements MessageListener
{
super.setUp();
connection = new AMQConnection(_broker, "guest", "guest", randomize("Client"), "test");
- destination = new AMQQueue(randomize("LatencyTest"), true);
+ destination = new AMQQueue(connection,randomize("LatencyTest"), true);
session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
//set up a consumer
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/close/TopicPublisherCloseTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/close/TopicPublisherCloseTest.java
index ddd08130e4..5e2703d5a5 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/close/TopicPublisherCloseTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/close/TopicPublisherCloseTest.java
@@ -55,7 +55,7 @@ public class TopicPublisherCloseTest extends TestCase
{
AMQConnection connection = new AMQConnection(_connectionString, "guest", "guest", "Client", "test");
- Topic destination1 = new AMQTopic("t1");
+ Topic destination1 = new AMQTopic(connection, "t1");
TopicSession session1 = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TopicPublisher pub = session1.createPublisher(destination1);
connection.close();
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java
index bf2cfa3682..a2cd2e4da3 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java
@@ -41,9 +41,9 @@ public class JMSDestinationTest extends TestCase
public void testJMSDestination() throws Exception
{
- Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
+ AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- Queue queue = new AMQQueue(new AMQShortString("someQ"), new AMQShortString("someQ"), false, true);
+ Queue queue = new AMQQueue(con.getDefaultQueueExchangeName(), new AMQShortString("someQ"), new AMQShortString("someQ"), false, true);
MessageConsumer consumer = consumerSession.createConsumer(queue);
Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java
index 0903d0a5ba..dad1666299 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java
@@ -51,7 +51,7 @@ public class JMSPropertiesTest extends TestCase
public static final String JMS_CORR_ID = "QPIDID_01";
public static final int JMS_DELIV_MODE = 1;
public static final String JMS_TYPE = "test.jms.type";
- public static final Destination JMS_REPLY_TO = new AMQQueue("my.replyto");
+
protected void setUp() throws Exception
{
@@ -68,15 +68,15 @@ public class JMSPropertiesTest extends TestCase
public void testJMSProperties() throws Exception
{
- Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
+ AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- Queue queue = new AMQQueue(new AMQShortString("someQ"), new AMQShortString("someQ"), false, true);
+ Queue queue = new AMQQueue(con.getDefaultQueueExchangeName(),new AMQShortString("someQ"), new AMQShortString("someQ"), false, true);
MessageConsumer consumer = consumerSession.createConsumer(queue);
- Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
+ AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageProducer producer = producerSession.createProducer(queue);
-
+ Destination JMS_REPLY_TO = new AMQQueue(con2,"my.replyto");
//create a test message to send
ObjectMessage sentMsg = new NonQpidObjectMessage();
sentMsg.setJMSCorrelationID(JMS_CORR_ID);
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java
index 8b617093fc..fd425b9930 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java
@@ -33,6 +33,7 @@ import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.JMSMapMessage;
import org.apache.qpid.client.message.JMSTextMessage;
import org.apache.qpid.client.message.MessageConverter;
+import org.apache.qpid.exchange.ExchangeDefaults;
public class MessageConverterTest extends TestCase
@@ -41,7 +42,7 @@ public class MessageConverterTest extends TestCase
public static final String JMS_CORR_ID = "QPIDID_01";
public static final int JMS_DELIV_MODE = 1;
public static final String JMS_TYPE = "test.jms.type";
- public static final Destination JMS_REPLY_TO = new AMQQueue("my.replyto");
+ public static final Destination JMS_REPLY_TO = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME,"my.replyto");
protected JMSTextMessage testTextMessage;
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
index a5eb0384d8..07ef5f04d4 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
@@ -40,6 +40,7 @@ import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.BindingURL;
@@ -122,9 +123,9 @@ public class StreamMessageTest extends TestCase
public void testModifyReceivedMessageExpandsBuffer() throws Exception
{
- Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
+ AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- AMQQueue queue = new AMQQueue("testQ");
+ AMQQueue queue = new AMQQueue(con.getDefaultQueueExchangeName(), new AMQShortString("testQ"));
MessageConsumer consumer = consumerSession.createConsumer(queue);
consumer.setMessageListener(new MessageListener()
{
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
index f957df2c34..0828ab398c 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
@@ -54,8 +54,8 @@ public class DurableSubscriptionTest extends TestCase
public void testUnsubscribe() throws AMQException, JMSException, URLSyntaxException
{
- AMQTopic topic = new AMQTopic("MyTopic");
AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "test");
+ AMQTopic topic = new AMQTopic(con,"MyTopic");
Session session1 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
MessageConsumer consumer1 = session1.createConsumer(topic);
MessageProducer producer = session1.createProducer(topic);
@@ -95,8 +95,9 @@ public class DurableSubscriptionTest extends TestCase
public void testDurability() throws AMQException, JMSException, URLSyntaxException
{
- AMQTopic topic = new AMQTopic("MyTopic");
+
AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "test");
+ AMQTopic topic = new AMQTopic(con,"MyTopic");
Session session1 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
MessageConsumer consumer1 = session1.createConsumer(topic);
MessageProducer producer = session1.createProducer(topic);
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicPublisherTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicPublisherTest.java
index 9aebef71ca..929e2799a9 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicPublisherTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicPublisherTest.java
@@ -51,8 +51,9 @@ public class TopicPublisherTest extends TestCase
public void testUnidentifiedProducer() throws Exception
{
- AMQTopic topic = new AMQTopic("MyTopic");
+
AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "test");
+ AMQTopic topic = new AMQTopic(con,"MyTopic");
TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
TopicPublisher publisher = session1.createPublisher(null);
MessageConsumer consumer1 = session1.createConsumer(topic);
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
index 481441797f..fe7efb4e88 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
@@ -59,8 +59,9 @@ public class TopicSessionTest extends TestCase
public void testTopicSubscriptionUnsubscription() throws Exception
{
- AMQTopic topic = new AMQTopic("MyTopic");
+
AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test");
+ AMQTopic topic = new AMQTopic(con.getDefaultTopicExchangeName(),"MyTopic");
TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
TopicSubscriber sub = session1.createDurableSubscriber(topic,"subscription0");
TopicPublisher publisher = session1.createPublisher(topic);
@@ -104,9 +105,10 @@ public class TopicSessionTest extends TestCase
private void subscriptionNameReuseForDifferentTopic(boolean shutdown) throws Exception
{
- AMQTopic topic = new AMQTopic("MyTopic1" + String.valueOf(shutdown));
- AMQTopic topic2 = new AMQTopic("MyOtherTopic1" + String.valueOf(shutdown));
AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test");
+ AMQTopic topic = new AMQTopic(con,"MyTopic1" + String.valueOf(shutdown));
+ AMQTopic topic2 = new AMQTopic(con,"MyOtherTopic1" + String.valueOf(shutdown));
+
TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
TopicSubscriber sub = session1.createDurableSubscriber(topic, "subscription0");
TopicPublisher publisher = session1.createPublisher(null);
@@ -142,8 +144,9 @@ public class TopicSessionTest extends TestCase
public void testUnsubscriptionAfterConnectionClose() throws Exception
{
- AMQTopic topic = new AMQTopic("MyTopic3");
AMQConnection con1 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test");
+ AMQTopic topic = new AMQTopic(con1,"MyTopic3");
+
TopicSession session1 = con1.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
TopicPublisher publisher = session1.createPublisher(topic);
@@ -171,8 +174,9 @@ public class TopicSessionTest extends TestCase
public void testTextMessageCreation() throws Exception
{
- AMQTopic topic = new AMQTopic("MyTopic4");
+
AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test");
+ AMQTopic topic = new AMQTopic(con,"MyTopic4");
TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
TopicPublisher publisher = session1.createPublisher(topic);
MessageConsumer consumer1 = session1.createConsumer(topic);
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
index e291da797c..4296e43f88 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
@@ -25,7 +25,6 @@ import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
-import javax.jms.Session;
import javax.jms.TextMessage;
import junit.framework.TestCase;
@@ -37,6 +36,7 @@ import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.jms.Session;
public class TransactedTest extends TestCase
{
@@ -62,11 +62,13 @@ public class TransactedTest extends TestCase
{
super.setUp();
TransportConnection.createVMBroker(1);
- queue1 = new AMQQueue(new AMQShortString("Q1"), new AMQShortString("Q1"), false, true);
- queue2 = new AMQQueue("Q2", false);
-
con = new AMQConnection("vm://:1", "guest", "guest", "TransactedTest", "test");
session = con.createSession(true, 0);
+ queue1 = new AMQQueue(session.getDefaultQueueExchangeName(), new AMQShortString("Q1"), new AMQShortString("Q1"), false, true);
+ queue2 = new AMQQueue(session.getDefaultQueueExchangeName(), new AMQShortString("Q2"), false);
+
+
+
consumer1 = session.createConsumer(queue1);
//Dummy just to create the queue.
MessageConsumer consumer2 = session.createConsumer(queue2);
@@ -147,15 +149,15 @@ public class TransactedTest extends TestCase
public void testResendsMsgsAfterSessionClose() throws Exception
{
- Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
+ AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
Session consumerSession = con.createSession(true, Session.CLIENT_ACKNOWLEDGE);
- AMQQueue queue3 = new AMQQueue("Q3", false);
+ AMQQueue queue3 = new AMQQueue(consumerSession.getDefaultQueueExchangeName(),new AMQShortString("Q3"), false);
MessageConsumer consumer = consumerSession.createConsumer(queue3);
//force synch to ensure the consumer has resulted in a bound queue
((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
- Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
+ AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
Session producerSession = con2.createSession(true, Session.CLIENT_ACKNOWLEDGE);
MessageProducer producer = producerSession.createProducer(queue3);
diff --git a/java/client/src/test/java/org/apache/qpid/testutil/Config.java b/java/client/src/test/java/org/apache/qpid/testutil/Config.java
index ad51fe498d..e5b4834622 100644
--- a/java/client/src/test/java/org/apache/qpid/testutil/Config.java
+++ b/java/client/src/test/java/org/apache/qpid/testutil/Config.java
@@ -27,6 +27,7 @@ import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQHeadersExchange;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.exchange.ExchangeDefaults;
public class Config
{
@@ -117,12 +118,12 @@ public class Config
if(isQueue())
{
System.out.println("Using queue named " + name);
- return new AMQQueue(name);
+ return new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME,name);
}
else if(isTopic())
{
System.out.println("Using topic named " + name);
- return new AMQTopic(name);
+ return new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME,name);
}
else if(isHeaders())
{
diff --git a/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java b/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java
index 172f1b1790..55f9566955 100644
--- a/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java
+++ b/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java
@@ -24,6 +24,8 @@ import org.apache.qpid.framing.AMQShortString;
public class ExchangeDefaults
{
+ public final static AMQShortString DEFAULT_EXCHANGE_NAME = new AMQShortString("");
+
public final static AMQShortString TOPIC_EXCHANGE_NAME = new AMQShortString("amq.topic");
public final static AMQShortString TOPIC_EXCHANGE_CLASS = new AMQShortString("topic");
@@ -41,8 +43,4 @@ public class ExchangeDefaults
public final static AMQShortString FANOUT_EXCHANGE_CLASS = new AMQShortString("fanout");
- public final static AMQShortString SYSTEM_MANAGEMENT_EXCHANGE_NAME = new AMQShortString("qpid.sysmgmt");
-
- public final static AMQShortString SYSTEM_MANAGEMENT_CLASS = new AMQShortString("sysmmgmt");
-
}
diff --git a/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java b/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java
index 2ee4ce21cb..d44fc3cbd5 100644
--- a/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java
+++ b/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java
@@ -62,7 +62,7 @@ public class AMQBindingURL implements BindingURL
if (exchangeClass == null)
{
_url = ExchangeDefaults.DIRECT_EXCHANGE_CLASS + "://" +
- ExchangeDefaults.DIRECT_EXCHANGE_NAME + "//" + _url;
+ ExchangeDefaults.DEFAULT_EXCHANGE_NAME + "//" + _url;
//URLHelper.parseError(-1, "Exchange Class not specified.", _url);
parseBindingURL();
return;
@@ -76,7 +76,14 @@ public class AMQBindingURL implements BindingURL
if (exchangeName == null)
{
- throw URLHelper.parseError(-1, "Exchange Name not specified.", _url);
+ if(getExchangeClass().equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS))
+ {
+ setExchangeName(ExchangeDefaults.DEFAULT_EXCHANGE_NAME);
+ }
+ else
+ {
+ throw URLHelper.parseError(-1, "Exchange Name not specified.", _url);
+ }
}
else
{
@@ -172,6 +179,11 @@ public class AMQBindingURL implements BindingURL
{
_exchangeClass = exchangeClass;
+ if (exchangeClass.equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS))
+ {
+ setOption(BindingURL.OPTION_EXCLUSIVE, "true");
+ }
+
}
public AMQShortString getExchangeName()
@@ -182,11 +194,6 @@ public class AMQBindingURL implements BindingURL
private void setExchangeName(AMQShortString name)
{
_exchangeName = name;
-
- if (name.equals(ExchangeDefaults.TOPIC_EXCHANGE_NAME))
- {
- setOption(BindingURL.OPTION_EXCLUSIVE, "true");
- }
}
public AMQShortString getDestinationName()
diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java
index ab795d0459..78ab7c4c73 100644
--- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java
+++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java
@@ -35,6 +35,7 @@ import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.jms.ConnectionListener;
import org.apache.qpid.jms.Session;
import org.apache.qpid.topic.Config;
+import org.apache.qpid.exchange.ExchangeDefaults;
/**
* PingPongBouncer is a message listener the bounces back messages to their reply to destination. This is used to return
@@ -414,11 +415,11 @@ public class PingPongBouncer implements MessageListener
{
if (isPubSub())
{
- _consumerDestination = new AMQTopic(name);
+ _consumerDestination = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, name);
}
else
{
- _consumerDestination = new AMQQueue(name);
+ _consumerDestination = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, name);
}
}
diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
index 8def95f7b1..57d5c37fc6 100644
--- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
+++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
@@ -43,6 +43,7 @@ import org.apache.qpid.client.message.TestMessageFactory;
import org.apache.qpid.jms.MessageProducer;
import org.apache.qpid.jms.Session;
import org.apache.qpid.topic.Config;
+import org.apache.qpid.exchange.ExchangeDefaults;
import uk.co.thebadgerset.junit.extensions.BatchedThrottle;
import uk.co.thebadgerset.junit.extensions.Throttle;
@@ -704,13 +705,13 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
if (_isPubSub)
{
_logger.debug("Creating topics.");
- destination = new AMQTopic(rootName + id);
+ destination = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id);
}
// Otherwise this is a p2p pinger, in which case create queues.
else
{
_logger.debug("Creating queues.");
- destination = new AMQQueue(rootName + id);
+ destination = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, rootName + id);
}
// Keep the destination.
diff --git a/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java b/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java
index 76a0690b8c..54f5a0f660 100644
--- a/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java
+++ b/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java
@@ -31,6 +31,7 @@ import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.exchange.ExchangeDefaults;
/**
* This class has not kept up to date with the topic_listener in the cpp tests. It should provide identical behaviour for
@@ -97,9 +98,9 @@ public class Listener implements MessageListener
if (_session instanceof AMQSession)
{
- _topic = new AMQTopic(CONTROL_TOPIC);
+ _topic = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, CONTROL_TOPIC);
//_control = new AMQTopic(CONTROL_TOPIC);
- _response = new AMQQueue(RESPONSE_QUEUE);
+ _response = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, RESPONSE_QUEUE);
}
else
{
diff --git a/java/perftests/src/main/java/org/apache/qpid/topic/MessageFactory.java b/java/perftests/src/main/java/org/apache/qpid/topic/MessageFactory.java
index 8b87f76c3e..4efdc1cb56 100644
--- a/java/perftests/src/main/java/org/apache/qpid/topic/MessageFactory.java
+++ b/java/perftests/src/main/java/org/apache/qpid/topic/MessageFactory.java
@@ -24,6 +24,7 @@ import javax.jms.*;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.exchange.ExchangeDefaults;
/**
*/
@@ -46,8 +47,8 @@ class MessageFactory
_session = session;
if (session instanceof AMQSession)
{
- _topic = new AMQTopic("topic_control");
- _control = new AMQTopic("topictest.control");
+ _topic = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, "topic_control");
+ _control = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, "topictest.control");
}
else
{