diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2007-02-16 23:11:41 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2007-02-16 23:11:41 +0000 |
commit | dd8df96fcca8f5f9dcbe91ba012cff400a38daa7 (patch) | |
tree | ee84d98ec82abd31dd486f98fea1cb6bdb526db5 | |
parent | 6213309b7c179fdddfeca0273d5c1f6592adedd7 (diff) | |
download | qpid-python-dd8df96fcca8f5f9dcbe91ba012cff400a38daa7.tar.gz |
QPID-375 : remove assumptions on standard exchanges (amq.direct, amq.topic, etc), allow other exchanges to be created through virtualhosts.xml
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@508649 13f79535-47bb-0310-9956-ffa450edef68
71 files changed, 677 insertions, 278 deletions
diff --git a/java/broker/etc/virtualhosts.xml b/java/broker/etc/virtualhosts.xml index 3601daacc7..52ff23e090 100644 --- a/java/broker/etc/virtualhosts.xml +++ b/java/broker/etc/virtualhosts.xml @@ -24,58 +24,54 @@ <virtualhost> <name>localhost</name> <localhost> - <minimumAlertRepeatGap>30000</minimumAlertRepeatGap> - <maximumMessageCount>5000</maximumMessageCount> - <queue> - <name>queue</name> + <exchanges> + <exchange> + <type>direct</type> + <name>test.direct</name> + <durable>true</durable> + </exchange> + <exchange> + <type>topic</type> + <name>test.topic</name> + </exchange> + </exchanges> + <queues> + <exchange>amq.direct</exchange> + <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb --> + <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb --> + <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins --> + + <queue> + <name>queue</name> + </queue> + <queue> + <name>ping</name> + </queue> <queue> - <exchange>amq.direct</exchange> - <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb --> - <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb --> - <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins --> + <name>test-queue</name> + <test-queue> + <exchange>test.direct</exchange> + <durable>true</durable> + </test-queue> </queue> - </queue> - <queue> - <name>ping</name> - <ping> - <exchange>amq.direct</exchange> - <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb --> - <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb --> - <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins --> - </ping> - </queue> + <queue> + <name>test-ping</name> + <test-ping> + <exchange>test.direct</exchange> + </test-ping> + </queue> + + </queues> </localhost> </virtualhost> + + <virtualhost> <name>development</name> <development> <minimumAlertRepeatGap>30000</minimumAlertRepeatGap> <maximumMessageCount>5000</maximumMessageCount> - <queue> - <name>queue</name> - <queue> - <exchange>amq.direct</exchange> - <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb --> - <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb --> - <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins --> - </queue> - </queue> - <queue> - <name>ping</name> - <ping> - <exchange>amq.direct</exchange> - <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb --> - <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb --> - <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins --> - </ping> - </queue> - </development> - </virtualhost> - <virtualhost> - <name>test</name> - <test> - <minimumAlertRepeatGap>30000</minimumAlertRepeatGap> - <maximumMessageCount>5000</maximumMessageCount> + <queues> <queue> <name>queue</name> <queue> @@ -94,6 +90,34 @@ <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins --> </ping> </queue> + </queues> + </development> + </virtualhost> + <virtualhost> + <name>test</name> + <test> + <minimumAlertRepeatGap>30000</minimumAlertRepeatGap> + <maximumMessageCount>5000</maximumMessageCount> + <queues> + <queue> + <name>queue</name> + <queue> + <exchange>amq.direct</exchange> + <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb --> + <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb --> + <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins --> + </queue> + </queue> + <queue> + <name>ping</name> + <ping> + <exchange>amq.direct</exchange> + <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb --> + <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb --> + <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins --> + </ping> + </queue> + </queues> </test> </virtualhost> </virtualhosts> diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java index bd8f0c9670..af38a9abe5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java @@ -32,6 +32,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.exchange.ExchangeFactory; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; @@ -71,7 +72,16 @@ public class VirtualHostConfiguration throw new ConfigurationException("Unknown virtual host: " + virtualHostName); } - List queueNames = configuration.getList("queue.name"); + List exchangeNames = configuration.getList("exchanges.exchange.name"); + + for(Object exchangeNameObj : exchangeNames) + { + String exchangeName = String.valueOf(exchangeNameObj); + configureExchange(virtualHost, exchangeName, configuration); + } + + + List queueNames = configuration.getList("queues.queue.name"); for(Object queueNameObj : queueNames) { @@ -81,12 +91,49 @@ public class VirtualHostConfiguration } + private void configureExchange(VirtualHost virtualHost, String exchangeNameString, Configuration configuration) throws AMQException + { + + CompositeConfiguration exchangeConfiguration = new CompositeConfiguration(); + + exchangeConfiguration.addConfiguration(configuration.subset("exchanges.exchange."+ exchangeNameString)); + exchangeConfiguration.addConfiguration(configuration.subset("exchanges")); + + QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); + MessageStore messageStore = virtualHost.getMessageStore(); + ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); + ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory(); + + AMQShortString exchangeName = new AMQShortString(exchangeNameString); + + + Exchange exchange; + + + + synchronized (exchangeRegistry) + { + exchange = exchangeRegistry.getExchange(exchangeName); + if(exchange == null) + { + + AMQShortString type = new AMQShortString(exchangeConfiguration.getString("type","direct")); + boolean durable = exchangeConfiguration.getBoolean("durable",false); + boolean autodelete = exchangeConfiguration.getBoolean("autodelete",false); + + Exchange newExchange = exchangeFactory.createExchange(exchangeName,type,durable,autodelete,0); + exchangeRegistry.registerExchange(newExchange); + } + + } + } + private void configureQueue(VirtualHost virtualHost, String queueNameString, Configuration configuration) throws AMQException, ConfigurationException { CompositeConfiguration queueConfiguration = new CompositeConfiguration(); - queueConfiguration.addConfiguration(configuration.subset("queue."+ queueNameString)); - queueConfiguration.addConfiguration(configuration); + queueConfiguration.addConfiguration(configuration.subset("queues.queue."+ queueNameString)); + queueConfiguration.addConfiguration(configuration.subset("queues")); QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); MessageStore messageStore = virtualHost.getMessageStore(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java index 7e3f9857f9..c7803133b3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java @@ -55,10 +55,6 @@ public class DefaultExchangeRegistry implements ExchangeRegistry public void registerExchange(Exchange exchange) { - if(_defaultExchange == null) - { - setDefaultExchange(exchange); - } _exchangeMap.put(exchange.getName(), exchange); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java index 7e378dfd01..3798918428 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java @@ -63,7 +63,7 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basi // TODO: check the delivery tag field details - is it unique across the broker or per subscriber? if (body.exchange == null) { - body.exchange = ExchangeDefaults.DIRECT_EXCHANGE_NAME; + body.exchange = ExchangeDefaults.DEFAULT_EXCHANGE_NAME; } VirtualHost vHost = session.getVirtualHost(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java index 2218ff604f..a35cb9f7d3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java @@ -107,7 +107,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar queueRegistry.registerQueue(queue); if (autoRegister) { - Exchange defaultExchange = exchangeRegistry.getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME); + Exchange defaultExchange = exchangeRegistry.getDefaultExchange(); defaultExchange.registerQueue(body.queue, queue, null); queue.bind(body.queue, defaultExchange); _log.info("Queue " + body.queue + " bound to default exchange"); diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java index 6b6163724c..fa8f13127a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java @@ -28,13 +28,14 @@ import org.apache.qpid.server.exchange.ExchangeRegistry; public class ExchangeInitialiser { - public void initialise(ExchangeFactory factory, ExchangeRegistry registry) throws AMQException{ + public void initialise(ExchangeFactory factory, ExchangeRegistry registry) throws AMQException{ + define(registry, factory, ExchangeDefaults.DEFAULT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); define(registry, factory, ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); define(registry, factory, ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS); define(registry, factory, ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS); define(registry, factory, ExchangeDefaults.FANOUT_EXCHANGE_NAME, ExchangeDefaults.FANOUT_EXCHANGE_CLASS); - registry.setDefaultExchange(registry.getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME)); + registry.setDefaultExchange(registry.getExchange(ExchangeDefaults.DEFAULT_EXCHANGE_NAME)); } private void define(ExchangeRegistry r, ExchangeFactory f, diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index ebaa22ce44..2030876952 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -20,37 +20,6 @@ */ package org.apache.qpid.client; -import java.io.IOException; -import java.net.ConnectException; -import java.nio.channels.UnresolvedAddressException; -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; - -import javax.jms.ConnectionConsumer; -import javax.jms.ConnectionMetaData; -import javax.jms.Destination; -import javax.jms.ExceptionListener; -import javax.jms.IllegalStateException; -import javax.jms.JMSException; -import javax.jms.Queue; -import javax.jms.QueueConnection; -import javax.jms.QueueSession; -import javax.jms.ServerSessionPool; -import javax.jms.Session; -import javax.jms.Topic; -import javax.jms.TopicConnection; -import javax.jms.TopicSession; -import javax.naming.NamingException; -import javax.naming.Reference; -import javax.naming.Referenceable; -import javax.naming.StringRefAddr; - -import org.apache.log4j.Logger; import org.apache.qpid.AMQConnectionFailureException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQUndeliveredException; @@ -59,6 +28,8 @@ import org.apache.qpid.client.failover.FailoverSupport; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicQosBody; import org.apache.qpid.framing.BasicQosOkBody; import org.apache.qpid.framing.ChannelOpenBody; @@ -73,6 +44,25 @@ import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.jms.FailoverPolicy; import org.apache.qpid.url.URLSyntaxException; +import org.apache.log4j.Logger; + +import javax.jms.*; +import javax.jms.IllegalStateException; +import javax.naming.NamingException; +import javax.naming.Reference; +import javax.naming.Referenceable; +import javax.naming.StringRefAddr; +import java.io.IOException; +import java.net.ConnectException; +import java.nio.channels.UnresolvedAddressException; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable { private static final Logger _logger = Logger.getLogger(AMQConnection.class); @@ -157,12 +147,17 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect * The connection meta data */ private QpidConnectionMetaData _connectionMetaData; - + /** * Configuration info for SSL */ private SSLConfiguration _sslConfiguration; + private AMQShortString _defaultTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME; + private AMQShortString _defaultQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME; + private AMQShortString _temporaryTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME; + private AMQShortString _temporaryQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME; + /** * @param broker brokerdetails * @param username username @@ -180,7 +175,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect (clientName == null ? "" : clientName) + "/" + virtualHost + "?brokerlist='" + AMQBrokerDetails.checkTransport(broker) + "'"), null); } - + /** * @param broker brokerdetails * @param username username @@ -198,20 +193,20 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect (clientName == null ? "" : clientName) + "/" + virtualHost + "?brokerlist='" + AMQBrokerDetails.checkTransport(broker) + "'"), sslConfig); } - + public AMQConnection(String host, int port, String username, String password, String clientName, String virtualHost) throws AMQException, URLSyntaxException { this(host, port, false, username, password, clientName, virtualHost, null); } - + public AMQConnection(String host, int port, String username, String password, - String clientName, String virtualHost, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException + String clientName, String virtualHost, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException { - this(host, port, false, username, password, clientName, virtualHost, sslConfig); + this(host, port, false, username, password, clientName, virtualHost, sslConfig); } - + public AMQConnection(String host, int port, boolean useSSL, String username, String password, String clientName, String virtualHost, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException @@ -234,12 +229,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { this(new AMQConnectionURL(connection), null); } - + public AMQConnection(String connection, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException { this(new AMQConnectionURL(connection), sslConfig); } - + public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException { @@ -257,6 +252,28 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _password = connectionURL.getPassword(); setVirtualHost(connectionURL.getVirtualHost()); + + if (connectionURL.getDefaultQueueExchangeName() != null) + { + _defaultQueueExchangeName = connectionURL.getDefaultQueueExchangeName(); + } + + if (connectionURL.getDefaultTopicExchangeName() != null) + { + _defaultTopicExchangeName = connectionURL.getDefaultTopicExchangeName(); + } + + if (connectionURL.getTemporaryQueueExchangeName() != null) + { + _temporaryQueueExchangeName = connectionURL.getTemporaryQueueExchangeName(); + } + + if (connectionURL.getTemporaryTopicExchangeName() != null) + { + _temporaryTopicExchangeName = connectionURL.getTemporaryTopicExchangeName(); + } + + _failoverPolicy = new FailoverPolicy(connectionURL); _protocolHandler = new AMQProtocolHandler(this); @@ -440,7 +457,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } - public Session createSession(final boolean transacted, final int acknowledgeMode) throws JMSException + public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode) throws JMSException { return createSession(transacted, acknowledgeMode, AMQSession.DEFAULT_PREFETCH_HIGH_MARK); } @@ -1070,9 +1087,53 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect AMQConnectionFactory.class.getName(), null); // factory location } - + public SSLConfiguration getSSLConfiguration() { - return _sslConfiguration; + return _sslConfiguration; + } + + public AMQShortString getDefaultTopicExchangeName() + { + return _defaultTopicExchangeName; + } + + + public void setDefaultTopicExchangeName(AMQShortString defaultTopicExchangeName) + { + _defaultTopicExchangeName = defaultTopicExchangeName; + } + + + public AMQShortString getDefaultQueueExchangeName() + { + return _defaultQueueExchangeName; + } + + + public void setDefaultQueueExchangeName(AMQShortString defaultQueueExchangeName) + { + _defaultQueueExchangeName = defaultQueueExchangeName; + } + + public AMQShortString getTemporaryTopicExchangeName() + { + return _temporaryTopicExchangeName; + } + + public AMQShortString getTemporaryQueueExchangeName() + { + return _temporaryQueueExchangeName; //To change body of created methods use File | Settings | File Templates. + } + + + public void setTemporaryTopicExchangeName(AMQShortString temporaryTopicExchangeName) + { + _temporaryTopicExchangeName = temporaryTopicExchangeName; + } + + public void setTemporaryQueueExchangeName(AMQShortString temporaryQueueExchangeName) + { + _temporaryQueueExchangeName = temporaryQueueExchangeName; } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java index fea83d3128..0dcc544ea8 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java @@ -20,6 +20,12 @@ */ package org.apache.qpid.client; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.jms.BrokerDetails; +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.url.URLHelper; +import org.apache.qpid.url.URLSyntaxException; + import java.net.URI; import java.net.URISyntaxException; import java.util.HashMap; @@ -27,11 +33,6 @@ import java.util.LinkedList; import java.util.List; import java.util.StringTokenizer; -import org.apache.qpid.jms.BrokerDetails; -import org.apache.qpid.jms.ConnectionURL; -import org.apache.qpid.url.URLHelper; -import org.apache.qpid.url.URLSyntaxException; - public class AMQConnectionURL implements ConnectionURL { private String _url; @@ -43,6 +44,11 @@ public class AMQConnectionURL implements ConnectionURL private String _username; private String _password; private String _virtualHost; + private AMQShortString _defaultQueueExchangeName; + private AMQShortString _defaultTopicExchangeName; + private AMQShortString _temporaryTopicExchangeName; + private AMQShortString _temporaryQueueExchangeName; + public AMQConnectionURL(String fullURL) throws URLSyntaxException { @@ -107,7 +113,7 @@ public class AMQConnectionURL implements ConnectionURL if (userInfo == null) { throw URLHelper.parseError(AMQ_PROTOCOL.length() + 3, - "User information not found on url", fullURL); + "User information not found on url", fullURL); } else { @@ -161,7 +167,9 @@ public class AMQConnectionURL implements ConnectionURL { if (slash != 0 && fullURL.charAt(slash - 1) == ':') { - throw URLHelper.parseError(slash - 2, fullURL.indexOf('?') - slash + 2, "Virtual host looks like a windows path, forward slash not allowed in URL", fullURL); + throw URLHelper.parseError(slash - 2, fullURL.indexOf('?') - slash + 2, + "Virtual host looks like a windows path, forward slash not allowed in URL", + fullURL); } else { @@ -181,7 +189,7 @@ public class AMQConnectionURL implements ConnectionURL if (colonIndex == -1) { throw URLHelper.parseError(AMQ_PROTOCOL.length() + 3, userinfo.length(), - "Null password in user information not allowed.", _url); + "Null password in user information not allowed.", _url); } else { @@ -230,6 +238,29 @@ public class AMQConnectionURL implements ConnectionURL _options.remove(OPTIONS_FAILOVER); } + + if (_options.containsKey(OPTIONS_DEFAULT_TOPIC_EXCHANGE)) + { + _defaultTopicExchangeName = new AMQShortString(_options.get(OPTIONS_DEFAULT_TOPIC_EXCHANGE)); + } + + + if (_options.containsKey(OPTIONS_DEFAULT_QUEUE_EXCHANGE)) + { + _defaultQueueExchangeName = new AMQShortString(_options.get(OPTIONS_DEFAULT_QUEUE_EXCHANGE)); + } + + + if (_options.containsKey(OPTIONS_TEMPORARY_QUEUE_EXCHANGE)) + { + _temporaryQueueExchangeName = new AMQShortString(_options.get(OPTIONS_TEMPORARY_QUEUE_EXCHANGE)); + } + + + if (_options.containsKey(OPTIONS_TEMPORARY_TOPIC_EXCHANGE)) + { + _temporaryTopicExchangeName = new AMQShortString(_options.get(OPTIONS_TEMPORARY_TOPIC_EXCHANGE)); + } } public String getURL() @@ -332,6 +363,26 @@ public class AMQConnectionURL implements ConnectionURL _options.put(key, value); } + public AMQShortString getDefaultQueueExchangeName() + { + return _defaultQueueExchangeName; + } + + public AMQShortString getDefaultTopicExchangeName() + { + return _defaultTopicExchangeName; + } + + public AMQShortString getTemporaryQueueExchangeName() + { + return _temporaryQueueExchangeName; + } + + public AMQShortString getTemporaryTopicExchangeName() + { + return _temporaryTopicExchangeName; + } + public String toString() { StringBuffer sb = new StringBuffer(); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java index a994dbc670..661372845a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java @@ -145,12 +145,12 @@ public abstract class AMQDestination implements Destination, Referenceable public boolean isTopic() { - return ExchangeDefaults.TOPIC_EXCHANGE_NAME.equals(_exchangeName); + return ExchangeDefaults.TOPIC_EXCHANGE_CLASS.equals(_exchangeClass); } public boolean isQueue() { - return ExchangeDefaults.DIRECT_EXCHANGE_NAME.equals(_exchangeName); + return ExchangeDefaults.DIRECT_EXCHANGE_CLASS.equals(_exchangeClass); } public AMQShortString getDestinationName() @@ -411,11 +411,11 @@ public abstract class AMQDestination implements Destination, Referenceable if (exchangeClass.equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS)) { - return new AMQQueue(destinationName,queueName,isExclusive,isAutoDelete,isDurable); + return new AMQQueue(exchangeName,destinationName,queueName,isExclusive,isAutoDelete,isDurable); } else if (exchangeClass.equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS)) { - return new AMQTopic(destinationName,isAutoDelete,queueName,isDurable); + return new AMQTopic(exchangeName,destinationName,isAutoDelete,queueName,isDurable); } else if (exchangeClass.equals(ExchangeDefaults.HEADERS_EXCHANGE_CLASS)) { diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java b/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java index 44328e3555..9185bc87e8 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java @@ -21,6 +21,7 @@ package org.apache.qpid.client; import javax.jms.Queue; +import javax.jms.Connection; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; @@ -43,9 +44,19 @@ public class AMQQueue extends AMQDestination implements Queue * Create a reference to a non temporary queue. Note this does not actually imply the queue exists. * @param name the name of the queue */ - public AMQQueue(AMQShortString name) + public AMQQueue(AMQShortString exchangeName, String name) { - this(name, false); + this(exchangeName, new AMQShortString(name)); + } + + + /** + * Create a reference to a non temporary queue. Note this does not actually imply the queue exists. + * @param name the name of the queue + */ + public AMQQueue(AMQShortString exchangeName, AMQShortString name) + { + this(exchangeName, name, false); } public AMQQueue(AMQShortString exchangeName, AMQShortString routingKey, AMQShortString queueName) @@ -58,9 +69,20 @@ public class AMQQueue extends AMQDestination implements Queue * Create a reference to a non temporary queue. Note this does not actually imply the queue exists. * @param name the name of the queue */ - public AMQQueue(String name) + public AMQQueue(String exchangeName, String name) { - this(new AMQShortString(name), false); + this(new AMQShortString(exchangeName), new AMQShortString(name), false); + } + + + public AMQQueue(AMQConnection connection, String name) + { + this(connection.getDefaultQueueExchangeName(),name); + } + + public AMQQueue(AMQConnection connection, String name, boolean temporary) + { + this(connection.getDefaultQueueExchangeName(), new AMQShortString(name),temporary); } @@ -71,9 +93,9 @@ public class AMQQueue extends AMQDestination implements Queue * @param temporary if true the broker will generate a queue name, also if true then the queue is autodeleted * and exclusive */ - public AMQQueue(String name, boolean temporary) + public AMQQueue(String exchangeName, String name, boolean temporary) { - this(new AMQShortString(name),temporary); + this(new AMQShortString(exchangeName), new AMQShortString(name),temporary); } @@ -84,11 +106,11 @@ public class AMQQueue extends AMQDestination implements Queue * @param temporary if true the broker will generate a queue name, also if true then the queue is autodeleted * and exclusive */ - public AMQQueue(AMQShortString name, boolean temporary) + public AMQQueue(AMQShortString exchangeName, AMQShortString name, boolean temporary) { // queue name is set to null indicating that the broker assigns a name in the case of temporary queues // temporary queues are typically used as response queues - this(name, temporary?null:name, temporary, temporary, !temporary); + this(exchangeName, name, temporary?null:name, temporary, temporary, !temporary); } @@ -99,19 +121,20 @@ public class AMQQueue extends AMQDestination implements Queue * @param exclusive true if the queue should only permit a single consumer * @param autoDelete true if the queue should be deleted automatically when the last consumers detaches */ - public AMQQueue(AMQShortString destinationName, AMQShortString queueName, boolean exclusive, boolean autoDelete) + public AMQQueue(AMQShortString exchangeName, AMQShortString destinationName, AMQShortString queueName, boolean exclusive, boolean autoDelete) { - this(destinationName, queueName, exclusive, autoDelete, false); + this(exchangeName, destinationName, queueName, exclusive, autoDelete, false); } - public AMQQueue(AMQShortString destinationName, AMQShortString queueName, boolean exclusive, boolean autoDelete, boolean durable) + public AMQQueue(AMQShortString exchangeName, AMQShortString destinationName, AMQShortString queueName, boolean exclusive, boolean autoDelete, boolean durable) { - super(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, destinationName, exclusive, + super(exchangeName, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, destinationName, exclusive, autoDelete, queueName, durable); } - + + public AMQShortString getRoutingKey() { return getAMQQueueName(); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 3973b5dd71..7ab26f3b47 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -1312,7 +1312,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi checkNotClosed(); if (queueName.indexOf('/') == -1) { - return new AMQQueue(queueName); + return new AMQQueue(getDefaultQueueExchangeName(), new AMQShortString(queueName)); } else { @@ -1330,6 +1330,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } + public AMQShortString getDefaultQueueExchangeName() + { + return _connection.getDefaultQueueExchangeName(); + } + /** * Creates a QueueReceiver wrapping a MessageConsumer * @@ -1379,7 +1384,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (topicName.indexOf('/') == -1) { - return new AMQTopic(new AMQShortString(topicName)); + return new AMQTopic(getDefaultTopicExchangeName(),new AMQShortString(topicName)); } else { @@ -1397,6 +1402,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } + public AMQShortString getDefaultTopicExchangeName() + { + return _connection.getDefaultTopicExchangeName(); + } + /** * Creates a non-durable subscriber * @@ -1409,8 +1419,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public TopicSubscriber createSubscriber(Topic topic) throws JMSException { checkNotClosed(); - checkValidTopic(topic); - AMQTopic dest = new AMQTopic(topic.getTopicName()); + AMQTopic dest = checkValidTopic(topic); + //AMQTopic dest = new AMQTopic(topic.getTopicName()); return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest)); } @@ -1428,16 +1438,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException { checkNotClosed(); - checkValidTopic(topic); - AMQTopic dest = new AMQTopic(topic.getTopicName()); + AMQTopic dest = checkValidTopic(topic); + //AMQTopic dest = new AMQTopic(topic.getTopicName()); return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal)); } public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException { checkNotClosed(); - checkValidTopic(topic); - AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection); + AMQTopic origTopic = checkValidTopic(topic); + AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection); TopicSubscriberAdaptor subscriber = _subscriptions.get(name); if (subscriber != null) { @@ -1464,8 +1474,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } // if the queue is bound to the exchange but NOT for this topic, then the JMS spec // says we must trash the subscription. - if (isQueueBound(dest.getAMQQueueName()) && - !isQueueBound(dest.getAMQQueueName(), topicName)) + if (isQueueBound(dest.getExchangeName(),dest.getAMQQueueName()) && + !isQueueBound(dest.getExchangeName(),dest.getAMQQueueName(), topicName)) { deleteQueue(dest.getAMQQueueName()); } @@ -1556,7 +1566,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } else { - if (isQueueBound(AMQTopic.getDurableTopicQueueName(name, _connection))) + if (isQueueBound(getDefaultTopicExchangeName(), AMQTopic.getDurableTopicQueueName(name, _connection))) { deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection)); } @@ -1567,17 +1577,17 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } - boolean isQueueBound(AMQShortString queueName) throws JMSException + boolean isQueueBound(AMQShortString exchangeName, AMQShortString queueName) throws JMSException { - return isQueueBound(queueName, null); + return isQueueBound(exchangeName, queueName, null); } - boolean isQueueBound(AMQShortString queueName, AMQShortString routingKey) throws JMSException + boolean isQueueBound(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey) throws JMSException { // TODO: Be aware of possible changes to parameter order as versions change. AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - ExchangeDefaults.TOPIC_EXCHANGE_NAME, // exchange + exchangeName, // exchange queueName, // queue routingKey); // routingKey AMQMethodEvent response = null; @@ -1858,7 +1868,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi /* * I could have combined the last 3 methods, but this way it improves readability */ - private void checkValidTopic(Topic topic) throws JMSException + private AMQTopic checkValidTopic(Topic topic) throws JMSException { if (topic == null) { @@ -1866,8 +1876,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } if ((topic instanceof TemporaryDestination) && ((TemporaryDestination) topic).getSession() != this) { - throw new JMSException("Cannot create a subscription on a temporary topic created in another session"); + throw new javax.jms.InvalidDestinationException("Cannot create a subscription on a temporary topic created in another session"); } + if(!(topic instanceof AMQTopic)) + { + throw new javax.jms.InvalidDestinationException("Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: " + topic.getClass().getName()); + } + return (AMQTopic) topic; } private void checkValidQueue(Queue queue) throws InvalidDestinationException @@ -1887,6 +1902,18 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } + public AMQShortString getTemporaryTopicExchangeName() + { + return _connection.getTemporaryTopicExchangeName(); + } + + public AMQShortString getTemporaryQueueExchangeName() + { + return _connection.getTemporaryQueueExchangeName(); + } + + + public int getTicket() { return _ticket; diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java b/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java index c350eb0c45..ce8e14506f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java @@ -40,7 +40,7 @@ final class AMQTemporaryQueue extends AMQQueue implements TemporaryQueue, Tempor */ public AMQTemporaryQueue(AMQSession session) { - super(new AMQShortString("TempQueue" + Long.toString(System.currentTimeMillis())), true); + super(session.getTemporaryQueueExchangeName(),new AMQShortString("TempQueue" + Long.toString(System.currentTimeMillis())), true); _session = session; } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java b/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java index 241a9abc9b..6c954ec3df 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.client; +import org.apache.qpid.framing.AMQShortString; + import javax.jms.JMSException; import javax.jms.TemporaryTopic; @@ -36,7 +38,7 @@ class AMQTemporaryTopic extends AMQTopic implements TemporaryTopic, TemporaryDes */ public AMQTemporaryTopic(AMQSession session) { - super("TempQueue" + Long.toString(System.currentTimeMillis())); + super(session.getTemporaryTopicExchangeName(),new AMQShortString("TempQueue" + Long.toString(System.currentTimeMillis()))); _session = session; } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java index 7d84ec6470..319e728edf 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java @@ -39,32 +39,43 @@ public class AMQTopic extends AMQDestination implements Topic super(binding); } - public AMQTopic(String name) - { - this(new AMQShortString(name)); - } +// public AMQTopic(String exchangeName, String routingKey) +// { +// this(new AMQShortString(exchangeName), new AMQShortString(routingKey)); +// } public AMQTopic(AMQShortString exchange, AMQShortString routingKey, AMQShortString queueName) { super(exchange, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, routingKey, true, true, queueName, false); } + public AMQTopic(AMQConnection conn, String routingKey) + { + this(conn.getDefaultTopicExchangeName(), new AMQShortString(routingKey)); + } + + + public AMQTopic(AMQShortString exchangeName, String routingKey) + { + this(exchangeName, new AMQShortString(routingKey)); + } - public AMQTopic(AMQShortString name) + public AMQTopic(AMQShortString exchangeName, AMQShortString routingKey) { - this(name, true, null, false); + this(exchangeName, routingKey, null); } - public AMQTopic(AMQShortString name, boolean isAutoDelete, AMQShortString queueName, boolean isDurable) + public AMQTopic(AMQShortString exchangeName, AMQShortString name, boolean isAutoDelete, AMQShortString queueName, boolean isDurable) { - super(ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, name, true, isAutoDelete, + super(exchangeName, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, name, true, isAutoDelete, queueName, isDurable); } public static AMQTopic createDurableTopic(AMQTopic topic, String subscriptionName, AMQConnection connection) throws JMSException { - return new AMQTopic(topic.getDestinationName(), false, getDurableTopicQueueName(subscriptionName, connection), + return new AMQTopic(topic.getExchangeName(), topic.getDestinationName(), false, + getDurableTopicQueueName(subscriptionName, connection), true); } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java index 3f8c1f65f8..19382b58c3 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java @@ -380,4 +380,9 @@ public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessag } } + public String toString() + { + return String.valueOf(System.identityHashCode(this)); + } + } diff --git a/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java b/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java index caadb0f621..2d91e290c4 100644 --- a/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java +++ b/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.jms; +import org.apache.qpid.framing.AMQShortString; + import java.util.List; /** @@ -35,6 +37,10 @@ public interface ConnectionURL public static final String OPTIONS_FAILOVER = "failover"; public static final String OPTIONS_FAILOVER_CYCLE = "cyclecount"; public static final String OPTIONS_SSL = "ssl"; + public static final String OPTIONS_DEFAULT_TOPIC_EXCHANGE = "defaultTopicExchange"; + public static final String OPTIONS_DEFAULT_QUEUE_EXCHANGE = "defaultQueueExchange"; + public static final String OPTIONS_TEMPORARY_TOPIC_EXCHANGE = "temporaryTopicExchange"; + public static final String OPTIONS_TEMPORARY_QUEUE_EXCHANGE = "temporaryQueueExchange"; String getURL(); @@ -69,4 +75,12 @@ public interface ConnectionURL String getOption(String key); void setOption(String key, String value); + + AMQShortString getDefaultQueueExchangeName(); + + AMQShortString getDefaultTopicExchangeName(); + + AMQShortString getTemporaryQueueExchangeName(); + + AMQShortString getTemporaryTopicExchangeName(); } diff --git a/java/client/src/main/java/org/apache/qpid/jms/Session.java b/java/client/src/main/java/org/apache/qpid/jms/Session.java index 025aef66c8..5287381fae 100644 --- a/java/client/src/main/java/org/apache/qpid/jms/Session.java +++ b/java/client/src/main/java/org/apache/qpid/jms/Session.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.jms; +import org.apache.qpid.framing.AMQShortString; + import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; @@ -88,4 +90,12 @@ public interface Session extends javax.jms.Session */ MessageProducer createProducer(Destination destination, boolean immediate) throws JMSException; + + AMQShortString getTemporaryTopicExchangeName(); + + AMQShortString getDefaultQueueExchangeName(); + + AMQShortString getDefaultTopicExchangeName(); + + AMQShortString getTemporaryQueueExchangeName(); } diff --git a/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java b/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java index 7a76aa0002..4f6f1561b6 100644 --- a/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java +++ b/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java @@ -47,6 +47,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.BindingURL; import org.apache.qpid.url.URLSyntaxException; +import org.apache.qpid.exchange.ExchangeDefaults; public class PropertiesFileInitialContextFactory implements InitialContextFactory { @@ -236,12 +237,12 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor { if(value instanceof AMQShortString) { - return new AMQQueue((AMQShortString) value); + return new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, (AMQShortString) value); } else if (value instanceof String) { - return new AMQQueue(new AMQShortString((String) value)); + return new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, new AMQShortString((String) value)); } else if (value instanceof BindingURL) @@ -259,11 +260,11 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor { if(value instanceof AMQShortString) { - return new AMQTopic((AMQShortString)value); + return new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, (AMQShortString)value); } else if (value instanceof String) { - return new AMQTopic(new AMQShortString((String) value)); + return new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, new AMQShortString((String) value)); } else if (value instanceof BindingURL) diff --git a/java/client/src/old_test/java/org/apache/qpid/cluster/Client.java b/java/client/src/old_test/java/org/apache/qpid/cluster/Client.java index 7a413eee3d..cf8059a143 100644 --- a/java/client/src/old_test/java/org/apache/qpid/cluster/Client.java +++ b/java/client/src/old_test/java/org/apache/qpid/cluster/Client.java @@ -25,6 +25,8 @@ import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQTopic; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.url.URLSyntaxException; import javax.jms.MessageListener; @@ -48,8 +50,8 @@ public class Client this.name = name; session = connection.createSession(false, AMQSession.NO_ACKNOWLEDGE); - AMQTopic topic = new AMQTopic("cluster_test_topic"); - AMQQueue queue = new AMQQueue("cluster_test_queue"); + AMQTopic topic = new AMQTopic(((AMQSession)session).getDefaultTopicExchangeName(), new AMQShortString("cluster_test_topic")); + AMQQueue queue = new AMQQueue(((AMQSession)session).getDefaultQueueExchangeName(), new AMQShortString("cluster_test_queue")); topicProducer = session.createProducer(topic); queueProducer = session.createProducer(queue); diff --git a/java/client/src/old_test/java/org/apache/qpid/flow/ChannelFlowTest.java b/java/client/src/old_test/java/org/apache/qpid/flow/ChannelFlowTest.java index 0c57a73d5d..aba2d5d657 100644 --- a/java/client/src/old_test/java/org/apache/qpid/flow/ChannelFlowTest.java +++ b/java/client/src/old_test/java/org/apache/qpid/flow/ChannelFlowTest.java @@ -24,6 +24,8 @@ import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; import javax.jms.Message; import javax.jms.MessageListener; @@ -41,7 +43,7 @@ public class ChannelFlowTest implements MessageListener ChannelFlowTest(AMQConnection connection) throws Exception { - this(connection, new AMQQueue(randomize("ChannelFlowTest"), true)); + this(connection, new AMQQueue(connection.getDefaultQueueExchangeName(), new AMQShortString(randomize("ChannelFlowTest")), true)); } ChannelFlowTest(AMQConnection connection, AMQDestination destination) throws Exception diff --git a/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargePublisher.java b/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargePublisher.java index 983186a545..a246352d8b 100644 --- a/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargePublisher.java +++ b/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargePublisher.java @@ -21,9 +21,12 @@ package org.apache.qpid.fragmentation; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.url.URLSyntaxException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.client.AMQSession; import org.apache.qpid.jms.MessageProducer; import org.apache.qpid.jms.Session; import org.apache.log4j.Logger; @@ -49,7 +52,7 @@ public class TestLargePublisher private AMQConnection _connection; - private Session _session; + private AMQSession _session; private class CallbackHandler implements MessageListener { @@ -109,8 +112,8 @@ public class TestLargePublisher { createConnection(host, port, clientID); - _session = (Session) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - AMQTopic destination = new AMQTopic("large"); + _session = (AMQSession) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + AMQTopic destination = new AMQTopic(_session.getDefaultTopicExchangeName(), new AMQShortString("large")); MessageProducer producer = (MessageProducer) _session.createProducer(destination); _connection.start(); diff --git a/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargeSubscriber.java b/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargeSubscriber.java index 03ace4a8d9..b0cde22349 100644 --- a/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargeSubscriber.java +++ b/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargeSubscriber.java @@ -22,7 +22,10 @@ package org.apache.qpid.fragmentation; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.client.AMQSession; import org.apache.qpid.jms.Session; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; import org.apache.log4j.Logger; import javax.jms.*; @@ -76,11 +79,12 @@ public class TestLargeSubscriber InetAddress address = InetAddress.getLocalHost(); AMQConnection con = new AMQConnection(host, port, username, password, address.getHostName(), virtualPath); - final Session session = (Session) con.createSession(false, Session.AUTO_ACKNOWLEDGE); + final AMQSession session = (AMQSession) con.createSession(false, Session.AUTO_ACKNOWLEDGE); final int expectedMessageCount = numExpectedMessages; - MessageConsumer consumer = session.createConsumer(new AMQTopic("large"), + MessageConsumer consumer = session.createConsumer(new AMQTopic(session.getDefaultTopicExchangeName(), + new AMQShortString("large")), 100, true, false, null); consumer.setMessageListener(new MessageListener() diff --git a/java/client/src/old_test/java/org/apache/qpid/latency/LatencyTest.java b/java/client/src/old_test/java/org/apache/qpid/latency/LatencyTest.java index ebc9488f68..8d833f4d4c 100644 --- a/java/client/src/old_test/java/org/apache/qpid/latency/LatencyTest.java +++ b/java/client/src/old_test/java/org/apache/qpid/latency/LatencyTest.java @@ -24,6 +24,8 @@ import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQSession; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; import javax.jms.MessageProducer; import javax.jms.Message; @@ -51,7 +53,7 @@ public class LatencyTest implements MessageListener LatencyTest(AMQConnection connection, int count, int delay, int length) throws Exception { - this(connection, new AMQQueue(randomize("LatencyTest"), true), count, delay, length); + this(connection, new AMQQueue(connection.getDefaultQueueExchangeName(), new AMQShortString(randomize("LatencyTest")), true), count, delay, length); } LatencyTest(AMQConnection connection, AMQDestination destination, int count, int delay, int length) throws Exception diff --git a/java/client/src/old_test/java/org/apache/qpid/multiconsumer/AMQTest.java b/java/client/src/old_test/java/org/apache/qpid/multiconsumer/AMQTest.java index 10e03d3522..db02b9954a 100644 --- a/java/client/src/old_test/java/org/apache/qpid/multiconsumer/AMQTest.java +++ b/java/client/src/old_test/java/org/apache/qpid/multiconsumer/AMQTest.java @@ -31,7 +31,6 @@ import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; -import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; @@ -41,6 +40,9 @@ import org.apache.commons.codec.binary.Base64; import org.apache.qpid.client.AMQConnectionFactory; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.jms.Session; /** * Test AMQ. @@ -54,7 +56,7 @@ public class AMQTest extends TestCase implements ExceptionListener private static final String DUMMYCONTENT = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; private static final String HUGECONTENT; - private Connection connect = null; + private AMQConnection connect = null; private Session pubSession = null; private Session subSession = null; private Topic topic = null; @@ -75,7 +77,7 @@ public class AMQTest extends TestCase implements ExceptionListener connect.setExceptionListener(this); pubSession = connect.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE); subSession = connect.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE); - topic = new AMQTopic(SUBJECT); + topic = new AMQTopic(pubSession.getDefaultTopicExchangeName(), new AMQShortString(SUBJECT)); connect.start(); } diff --git a/java/client/src/old_test/java/org/apache/qpid/pubsub1/TestPublisher.java b/java/client/src/old_test/java/org/apache/qpid/pubsub1/TestPublisher.java index 45b241975d..33891142b5 100644 --- a/java/client/src/old_test/java/org/apache/qpid/pubsub1/TestPublisher.java +++ b/java/client/src/old_test/java/org/apache/qpid/pubsub1/TestPublisher.java @@ -23,6 +23,8 @@ package org.apache.qpid.pubsub1; import org.apache.log4j.Logger; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.url.URLSyntaxException; import org.apache.qpid.client.AMQTopic; import org.apache.qpid.jms.MessageProducer; @@ -110,8 +112,8 @@ public class TestPublisher { createConnection(host, port, clientID); - _session = (Session) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - AMQTopic destination = new AMQTopic(commandQueueName); + _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + AMQTopic destination = new AMQTopic(_session.getDefaultTopicExchangeName(), new AMQShortString(commandQueueName)); MessageProducer producer = (MessageProducer) _session.createProducer(destination); _connection.start(); diff --git a/java/client/src/old_test/java/org/apache/qpid/pubsub1/TestSubscriber.java b/java/client/src/old_test/java/org/apache/qpid/pubsub1/TestSubscriber.java index 14cf206f50..450d9b3914 100644 --- a/java/client/src/old_test/java/org/apache/qpid/pubsub1/TestSubscriber.java +++ b/java/client/src/old_test/java/org/apache/qpid/pubsub1/TestSubscriber.java @@ -24,6 +24,8 @@ import org.apache.log4j.Logger; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQTopic; import org.apache.qpid.jms.Session; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; import javax.jms.MessageConsumer; import javax.jms.MessageListener; @@ -87,17 +89,17 @@ public class TestSubscriber InetAddress address = InetAddress.getLocalHost(); AMQConnection con1 = new AMQConnection(args[0], Integer.parseInt(args[1]), args[2], args[3], address.getHostName(), args[4]); - final org.apache.qpid.jms.Session session1 = (org.apache.qpid.jms.Session) con1.createSession(false, Session.AUTO_ACKNOWLEDGE); + final Session session1 = con1.createSession(false, Session.AUTO_ACKNOWLEDGE); AMQConnection con2 = new AMQConnection(args[0], Integer.parseInt(args[1]), args[2], args[3], address.getHostName(), args[4]); - final org.apache.qpid.jms.Session session2 = (org.apache.qpid.jms.Session) con2.createSession(false, Session.AUTO_ACKNOWLEDGE); + final Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE); String selector = args[6]; final int expectedMessageCount = Integer.parseInt(args[5]); _logger.info("Message selector is <" + selector + ">..."); - Topic t = new AMQTopic("cbr"); + Topic t = new AMQTopic(session1.getDefaultTopicExchangeName(), new AMQShortString("cbr")); MessageConsumer consumer1 = session1.createConsumer(t, 100, false, false, selector); MessageConsumer consumer2 = session2.createConsumer(t, diff --git a/java/client/src/old_test/java/org/apache/qpid/topic/MessageFactory.java b/java/client/src/old_test/java/org/apache/qpid/topic/MessageFactory.java index 1520f18408..39d64069d1 100644 --- a/java/client/src/old_test/java/org/apache/qpid/topic/MessageFactory.java +++ b/java/client/src/old_test/java/org/apache/qpid/topic/MessageFactory.java @@ -22,6 +22,8 @@ package org.apache.qpid.topic; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; import javax.jms.*; @@ -47,8 +49,8 @@ class MessageFactory _session = session; if(session instanceof AMQSession) { - _topic = new AMQTopic("topictest.messages"); - _control = new AMQTopic("topictest.control"); + _topic = new AMQTopic(((AMQSession)session).getDefaultTopicExchangeName(),new AMQShortString("topictest.messages")); + _control = new AMQTopic(((AMQSession)session).getDefaultTopicExchangeName(),new AMQShortString("topictest.control")); } else { diff --git a/java/client/src/old_test/java/org/apache/qpid/transacted/Ping.java b/java/client/src/old_test/java/org/apache/qpid/transacted/Ping.java index e0af4422a6..8f15bf089e 100644 --- a/java/client/src/old_test/java/org/apache/qpid/transacted/Ping.java +++ b/java/client/src/old_test/java/org/apache/qpid/transacted/Ping.java @@ -22,6 +22,8 @@ package org.apache.qpid.transacted; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.client.AMQQueue; import javax.jms.Connection; @@ -35,7 +37,7 @@ public class Ping Config config = new Config(argv); Connection con = config.createConnection(); con.setClientID("ping"); - new Relay(new AMQQueue("ping"), new AMQQueue("pong"), con, + new Relay(new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, new AMQShortString("ping")), new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, new AMQShortString("pong")), con, config.isEchoOn(), config.getBatchSize(), config.usePersistentMessages()).start(); diff --git a/java/client/src/old_test/java/org/apache/qpid/transacted/Pong.java b/java/client/src/old_test/java/org/apache/qpid/transacted/Pong.java index 13295c137a..f4f4b20d7c 100644 --- a/java/client/src/old_test/java/org/apache/qpid/transacted/Pong.java +++ b/java/client/src/old_test/java/org/apache/qpid/transacted/Pong.java @@ -22,6 +22,8 @@ package org.apache.qpid.transacted; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.client.AMQQueue; import javax.jms.Connection; @@ -34,7 +36,7 @@ public class Pong Config config = new Config(argv); Connection con = config.createConnection(); con.setClientID("pong"); - new Relay(new AMQQueue("pong"), new AMQQueue("ping"), con, + new Relay(new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, new AMQShortString("pong")), new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, new AMQShortString("ping")), con, config.isEchoOn(), config.getBatchSize(), config.usePersistentMessages()).start(); diff --git a/java/client/src/old_test/java/org/apache/qpid/transacted/Start.java b/java/client/src/old_test/java/org/apache/qpid/transacted/Start.java index 5564ed93ab..de718d828a 100644 --- a/java/client/src/old_test/java/org/apache/qpid/transacted/Start.java +++ b/java/client/src/old_test/java/org/apache/qpid/transacted/Start.java @@ -22,6 +22,8 @@ package org.apache.qpid.transacted; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.client.AMQQueue; import javax.jms.Connection; @@ -33,7 +35,7 @@ public class Start public static void main(String[] argv) throws Exception { Connection con = new Config(argv).createConnection(); - AMQQueue ping = new AMQQueue("ping"); + AMQQueue ping = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, new AMQShortString("ping")); Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); session.createProducer(ping).send(session.createTextMessage("start")); session.close(); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java index 266e01b66a..338404a431 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java @@ -28,7 +28,7 @@ import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Queue; -import javax.jms.Session; + import javax.jms.TextMessage; import junit.framework.TestCase; @@ -40,6 +40,7 @@ import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.jms.Session; public class RecoverTest extends TestCase { @@ -66,15 +67,15 @@ public class RecoverTest extends TestCase public void testRecoverResendsMsgs() throws Exception { - Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test"); + AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test"); Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); - Queue queue = new AMQQueue(new AMQShortString("someQ"), new AMQShortString("someQ"), false, true); + Queue queue = new AMQQueue(consumerSession.getDefaultQueueExchangeName(),new AMQShortString("someQ"), new AMQShortString("someQ"), false, true); MessageConsumer consumer = consumerSession.createConsumer(queue); //force synch to ensure the consumer has resulted in a bound queue ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); - Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test"); + AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test"); Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); MessageProducer producer = producerSession.createProducer(queue); @@ -123,15 +124,15 @@ public class RecoverTest extends TestCase public void testRecoverResendsMsgsAckOnEarlier() throws Exception { - Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test"); + AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test"); Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); - Queue queue = new AMQQueue(new AMQShortString("someQ"), new AMQShortString("someQ"), false, true); + Queue queue = new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("someQ"), new AMQShortString("someQ"), false, true); MessageConsumer consumer = consumerSession.createConsumer(queue); //force synch to ensure the consumer has resulted in a bound queue ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); - Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test"); + AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test"); Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); MessageProducer producer = producerSession.createProducer(queue); @@ -187,15 +188,15 @@ public class RecoverTest extends TestCase public void testAcknowledgePerConsumer() throws Exception { - Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test"); + AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test"); Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); - Queue queue = new AMQQueue(new AMQShortString("Q1"), new AMQShortString("Q1"), false, true); - Queue queue2 = new AMQQueue(new AMQShortString("Q2"), new AMQShortString("Q2"), false, true); + Queue queue = new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q1"), new AMQShortString("Q1"), false, true); + Queue queue2 = new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q2"), new AMQShortString("Q2"), false, true); MessageConsumer consumer = consumerSession.createConsumer(queue); MessageConsumer consumer2 = consumerSession.createConsumer(queue2); - Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test"); + AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test"); Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); MessageProducer producer = producerSession.createProducer(queue); MessageProducer producer2 = producerSession.createProducer(queue2); @@ -226,10 +227,10 @@ public class RecoverTest extends TestCase public void testRecoverInAutoAckListener() throws Exception { - Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test"); + AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test"); final Session consumerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = new AMQQueue(new AMQShortString("Q3"), new AMQShortString("Q3"), false, true); + Queue queue = new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q3"), new AMQShortString("Q3"), false, true); MessageConsumer consumer = consumerSession.createConsumer(queue); MessageProducer producer = consumerSession.createProducer(queue); producer.send(consumerSession.createTextMessage("hello")); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java index a6ae69c4de..0f336998f0 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java @@ -68,7 +68,7 @@ public class BytesMessageTest extends TestCase implements MessageListener void init(AMQConnection connection) throws Exception { - init(connection, new AMQQueue(randomize("BytesMessageTest"), true)); + init(connection, new AMQQueue(connection, randomize("BytesMessageTest"), true)); } void init(AMQConnection connection, AMQDestination destination) throws Exception diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java index a957c651b2..1b32b73dbe 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java @@ -70,7 +70,7 @@ public class FieldTableMessageTest extends TestCase implements MessageListener private void init(AMQConnection connection) throws Exception { - init(connection, new AMQQueue(randomize("FieldTableMessageTest"), true)); + init(connection, new AMQQueue(connection,randomize("FieldTableMessageTest"), true)); } private void init(AMQConnection connection, AMQDestination destination) throws Exception diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/LargeMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/LargeMessageTest.java index 0cca3e4659..3830d61701 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/LargeMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/LargeMessageTest.java @@ -68,7 +68,7 @@ public class LargeMessageTest extends TestCase private void init(AMQConnection connection) throws Exception { - Destination destination = new AMQQueue("LargeMessageTest", true); + Destination destination = new AMQQueue(connection,"LargeMessageTest", true); init(connection, destination); } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java index 78a4bd6b49..75eb3a8d5e 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java @@ -83,7 +83,7 @@ public class MapMessageTest extends TestCase implements MessageListener private void init(AMQConnection connection) throws Exception { - Destination destination = new AMQQueue(randomize("MapMessageTest"), true); + Destination destination = new AMQQueue(connection,randomize("MapMessageTest"), true); init(connection, destination); } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java index bdd80b43fe..c4e4753c21 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java @@ -32,6 +32,7 @@ import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQTopic; import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.exchange.ExchangeDefaults; public class MultipleConnectionTest extends TestCase { @@ -191,7 +192,7 @@ public class MultipleConnectionTest extends TestCase String broker = _connectionString; int messages = 10; - AMQTopic topic = new AMQTopic("amq.topic"); + AMQTopic topic = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME,"amq.topic"); Receiver[] receivers = new Receiver[]{ new Receiver(broker, topic, 2), diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java index e5add8fe08..099433e779 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java @@ -72,7 +72,7 @@ public class ObjectMessageTest extends TestCase implements MessageListener private void init(AMQConnection connection) throws Exception { - init(connection, new AMQQueue(randomize("ObjectMessageTest"), true)); + init(connection, new AMQQueue(connection,randomize("ObjectMessageTest"), true)); } private void init(AMQConnection connection, AMQDestination destination) throws Exception diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java index aceb40f4c7..463cdca17b 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java @@ -76,7 +76,7 @@ public class PropertyValueTest extends TestCase implements MessageListener private void init(AMQConnection connection) throws Exception { - Destination destination = new AMQQueue(randomize("PropertyValueTest"), true); + Destination destination = new AMQQueue(connection, randomize("PropertyValueTest"), true); init(connection, destination); } @@ -132,7 +132,7 @@ public class PropertyValueTest extends TestCase implements MessageListener } else { - q = new AMQQueue("TestReply"); + q = new AMQQueue(_connection,"TestReply"); } m.setJMSReplyTo(q); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java index b63677fc34..3aefc098aa 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java @@ -56,8 +56,11 @@ public class PubSubTwoConnectionTest extends TestCase */ public void testTwoConnections() throws Exception { - Topic topic = new AMQTopic("MyTopic"); - Connection con1 = new AMQConnection("vm://:1", "guest", "guest", "Client1", "test"); + + AMQConnection con1 = new AMQConnection("vm://:1", "guest", "guest", "Client1", "test"); + + Topic topic = new AMQTopic(con1, "MyTopic"); + Session session1 = con1.createSession(false, AMQSession.NO_ACKNOWLEDGE); MessageProducer producer = session1.createProducer(topic); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/ReceiveTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/ReceiveTest.java index e6392916c8..668233f356 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/ReceiveTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/ReceiveTest.java @@ -75,7 +75,7 @@ public class ReceiveTest extends TestCase private void init(AMQConnection connection) throws Exception { - init(connection, new AMQQueue("ReceiveTest", true)); + init(connection, new AMQQueue(connection,"ReceiveTest", true)); } private void init(AMQConnection connection, AMQDestination destination) throws Exception diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java index 70bd50db15..0cdafebb1c 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java @@ -61,7 +61,7 @@ public class SelectorTest extends TestCase implements MessageListener private void init(AMQConnection connection) throws Exception { - init(connection, new AMQQueue(randomize("SessionStartTest"), true)); + init(connection, new AMQQueue(connection,randomize("SessionStartTest"), true)); } private void init(AMQConnection connection, AMQDestination destination) throws Exception diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/SessionStartTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/SessionStartTest.java index 498a18d1da..6cf64499aa 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/SessionStartTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/SessionStartTest.java @@ -31,6 +31,7 @@ import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; import org.apache.qpid.testutil.VMBrokerSetup; +import org.apache.qpid.framing.AMQShortString; public class SessionStartTest extends TestCase implements MessageListener { @@ -53,7 +54,7 @@ public class SessionStartTest extends TestCase implements MessageListener private void init(AMQConnection connection) throws Exception { - init(connection, new AMQQueue(randomize("SessionStartTest"), true)); + init(connection, new AMQQueue(connection.getDefaultQueueExchangeName(),new AMQShortString(randomize("SessionStartTest")), true)); } private void init(AMQConnection connection, AMQDestination destination) throws Exception diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java index d0d220c9e5..24c93a0af8 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java @@ -41,6 +41,7 @@ import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.message.JMSTextMessage; import org.apache.qpid.testutil.VMBrokerSetup; +import org.apache.qpid.framing.AMQShortString; public class TextMessageTest extends TestCase implements MessageListener { @@ -74,7 +75,7 @@ public class TextMessageTest extends TestCase implements MessageListener private void init(AMQConnection connection) throws Exception { - Destination destination = new AMQQueue(randomize("TextMessageTest"), true); + Destination destination = new AMQQueue(connection.getDefaultQueueExchangeName(), new AMQShortString(randomize("TextMessageTest")), true); init(connection, destination); } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java index 92f30e6478..0e15341615 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java @@ -31,6 +31,7 @@ import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQTopic; import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.framing.AMQShortString; public class AMQConnectionTest extends TestCase { @@ -45,8 +46,8 @@ public class AMQConnectionTest extends TestCase super.setUp(); TransportConnection.createVMBroker(1); _connection = new AMQConnection("vm://:1", "guest", "guest", "fred", "test"); - _topic = new AMQTopic("mytopic"); - _queue = new AMQQueue("myqueue"); + _topic = new AMQTopic(_connection.getDefaultTopicExchangeName(), new AMQShortString("mytopic")); + _queue = new AMQQueue(_connection.getDefaultQueueExchangeName(), new AMQShortString("myqueue")); } protected void tearDown() throws Exception diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQSessionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQSessionTest.java index 6eff9d6d6a..78b7976f55 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQSessionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQSessionTest.java @@ -47,8 +47,8 @@ public class AMQSessionTest extends TestCase { super.setUp(); _connection = new AMQConnection("vm://:1", "guest", "guest", "fred", "test"); - _topic = new AMQTopic("mytopic"); - _queue = new AMQQueue("myqueue"); + _topic = new AMQTopic(_connection,"mytopic"); + _queue = new AMQQueue(_connection,"myqueue"); _session = (AMQSession) _connection.createSession(false, AMQSession.NO_ACKNOWLEDGE); } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java index 8211c5d8cf..c7f1bb3065 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java @@ -58,7 +58,7 @@ import org.apache.qpid.client.transport.TransportConnection; */ public class ChannelCloseOkTest extends TestCase { - private Connection _connection; + private AMQConnection _connection; private Destination _destination1; private Destination _destination2; private Session _session1; @@ -77,8 +77,8 @@ public class ChannelCloseOkTest extends TestCase TransportConnection.createVMBroker(1); _connection = new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test"); - _destination1 = new AMQQueue("q1", true); - _destination2 = new AMQQueue("q2", true); + _destination1 = new AMQQueue(_connection,"q1", true); + _destination2 = new AMQQueue(_connection, "q2", true); _session1 = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); _session1.createConsumer(_destination1).setMessageListener(new MessageListener() { @@ -164,7 +164,7 @@ public class ChannelCloseOkTest extends TestCase assertEquals(1, _received2.size()); // Now send message to incorrect destination on session 1. - Destination destination = new AMQQueue("incorrect"); + Destination destination = new AMQQueue(_connection, "incorrect"); send(_session1, destination, "third"); // no point waiting as message will never be received. // Ensure both sessions are still ok. diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java index 3aa8eaacef..d19a6095d5 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java @@ -49,10 +49,10 @@ public class CloseWithBlockingReceiveTest extends TestCase public void testReceiveReturnsNull() throws Exception { - final Connection connection = new AMQConnection("vm://:1", "guest", "guest", + final AMQConnection connection = new AMQConnection("vm://:1", "guest", "guest", "fred", "test"); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = session.createConsumer(new AMQTopic("banana")); + MessageConsumer consumer = session.createConsumer(new AMQTopic(connection, "banana")); connection.start(); Runnable r = new Runnable() diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java index b5586709d6..ae8e2cfbda 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java @@ -54,10 +54,12 @@ public class ConnectionStartTest extends TestCase try { - AMQQueue queue = new AMQQueue("ConnectionStartTest"); + AMQConnection pubCon = new AMQConnection(_broker, "guest", "guest", "fred", "test"); + AMQQueue queue = new AMQQueue(pubCon,"ConnectionStartTest"); + Session pubSess = pubCon.createSession(false, AMQSession.AUTO_ACKNOWLEDGE); MessageProducer pub = pubSess.createProducer(queue); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java index 585f52a959..d9ce080e14 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java @@ -20,16 +20,21 @@ */ package org.apache.qpid.test.unit.client.connection; -import javax.jms.Connection; - -import junit.framework.TestCase; - import org.apache.qpid.AMQConnectionFailureException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQUnresolvedAddressException; import org.apache.qpid.client.AMQAuthenticationException; import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQTopic; import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.jms.Session; + +import junit.framework.TestCase; + +import javax.jms.Connection; +import javax.jms.QueueSession; +import javax.jms.TopicSession; public class ConnectionTest extends TestCase { @@ -54,7 +59,7 @@ public class ConnectionTest extends TestCase { try { - AMQConnection conn = new AMQConnection(_broker, "guest", "guest", "fred", "test"); + AMQConnection conn = new AMQConnection(_broker, "guest", "guest", "fred", "test"); conn.close(); } catch (Exception e) @@ -63,6 +68,54 @@ public class ConnectionTest extends TestCase } } + + public void testDefaultExchanges() + { + try + { + AMQConnection conn = new AMQConnection("amqp://guest:guestd@clientid/test?brokerlist='" + + _broker + + "?retries='1''&defaultQueueExchange='test.direct'" + + "&defaultTopicExchange='test.topic'" + + "&temporaryQueueExchange='tmp.direct'" + + "&temporaryTopicExchange='tmp.topic'"); + + QueueSession queueSession = conn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); + + AMQQueue queue = (AMQQueue) queueSession.createQueue("MyQueue"); + + assertEquals(queue.getExchangeName().toString(), "test.direct"); + + AMQQueue tempQueue = (AMQQueue) queueSession.createTemporaryQueue(); + + assertEquals(tempQueue.getExchangeName().toString(), "tmp.direct"); + + + queueSession.close(); + + + TopicSession topicSession = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + + AMQTopic topic = (AMQTopic) topicSession.createTopic("silly.topic"); + + assertEquals(topic.getExchangeName().toString(), "test.topic"); + + AMQTopic tempTopic = (AMQTopic) topicSession.createTemporaryTopic(); + + assertEquals(tempTopic.getExchangeName().toString(), "tmp.topic"); + + topicSession.close(); + + + conn.close(); + } + catch (Exception e) + { + fail("Connection to " + _broker + " should succeed. Reason: " + e); + } + } + + // FIXME The inVM broker currently has no authentication .. Needs added QPID-70 public void passwordFailureConnection() throws Exception { diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java index 41ab535c6e..bfbba61913 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java @@ -456,6 +456,24 @@ public class ConnectionURLTest extends TestCase } } + + public void testDefaultExchanges() throws URLSyntaxException + { + String url = "amqp://guest:guest@id/test" + "?defaultQueueExchange='test.direct'&defaultTopicExchange='test.topic'&temporaryQueueExchange='tmp.direct'&temporaryTopicExchange='tmp.topic'"; + + AMQConnectionURL conn = new AMQConnectionURL(url); + + assertEquals(conn.getDefaultQueueExchangeName(),"test.direct"); + + assertEquals(conn.getDefaultTopicExchangeName(),"test.topic"); + + assertEquals(conn.getTemporaryQueueExchangeName(),"tmp.direct"); + + assertEquals(conn.getTemporaryTopicExchangeName(),"tmp.topic"); + + } + + public static junit.framework.Test suite() { return new junit.framework.TestSuite(ConnectionURLTest.class); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java index 92a0d4a8ee..b9394b87a1 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java @@ -131,7 +131,7 @@ public class DestinationURLTest extends TestCase AMQBindingURL dest = new AMQBindingURL(url); assertTrue(dest.getExchangeClass().equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS)); - assertTrue(dest.getExchangeName().equals(ExchangeDefaults.DIRECT_EXCHANGE_NAME)); + assertTrue(dest.getExchangeName().equals(ExchangeDefaults.DEFAULT_EXCHANGE_NAME)); assertTrue(dest.getDestinationName().equals("")); assertTrue(dest.getQueueName().equals("IBMPerfQueue1")); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java index 47be69c826..2e63bf4739 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java @@ -26,6 +26,7 @@ import javax.jms.MessageListener; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; +import org.apache.qpid.framing.AMQShortString; /** * Declare a private temporary response queue, @@ -50,10 +51,10 @@ public class Client implements MessageListener _connection = connection; _expected = expected; _session = (AMQSession) _connection.createSession(false, AMQSession.NO_ACKNOWLEDGE); - AMQQueue response = new AMQQueue("ResponseQueue", true); + AMQQueue response = new AMQQueue(_connection.getDefaultQueueExchangeName(), new AMQShortString("ResponseQueue"), true); _session.createConsumer(response).setMessageListener(this); _connection.start(); - AMQQueue service = new SpecialQueue("ServiceQueue"); + AMQQueue service = new SpecialQueue(_connection,"ServiceQueue"); Message request = _session.createTextMessage("Request!"); request.setJMSReplyTo(response); _session.createProducer(service).send(request); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Service.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Service.java index 6771453977..6593f7d86a 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Service.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Service.java @@ -46,7 +46,7 @@ public class Service implements MessageListener Service(AMQConnection connection) throws Exception { _connection = connection; - AMQQueue queue = new SpecialQueue("ServiceQueue"); + AMQQueue queue = new SpecialQueue(connection, "ServiceQueue"); _session = (AMQSession) _connection.createSession(false, AMQSession.NO_ACKNOWLEDGE); _session.createConsumer(queue).setMessageListener(this); _connection.start(); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/SpecialQueue.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/SpecialQueue.java index 691acbb213..27371b0397 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/SpecialQueue.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/SpecialQueue.java @@ -21,6 +21,7 @@ package org.apache.qpid.test.unit.client.forwardall; import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQConnection; import org.apache.qpid.framing.AMQShortString; /** @@ -32,14 +33,10 @@ class SpecialQueue extends AMQQueue { private final AMQShortString name; - SpecialQueue(String name) + SpecialQueue(AMQConnection con, String name) { - this(new AMQShortString(name)); - } - SpecialQueue(AMQShortString name) - { - super(name, true); - this.name = name; + super(con, name, true); + this.name = new AMQShortString(name); } public AMQShortString getRoutingKey() diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java index 5b9fb2549e..6d131cd52e 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java @@ -56,7 +56,7 @@ public class ObjectMessageTest extends TestCase implements MessageListener { super.setUp(); connection = new AMQConnection(_broker, "guest", "guest", randomize("Client"), "test"); - destination = new AMQQueue(randomize("LatencyTest"), true); + destination = new AMQQueue(connection,randomize("LatencyTest"), true); session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE); //set up a consumer diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/close/TopicPublisherCloseTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/close/TopicPublisherCloseTest.java index ddd08130e4..5e2703d5a5 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/close/TopicPublisherCloseTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/close/TopicPublisherCloseTest.java @@ -55,7 +55,7 @@ public class TopicPublisherCloseTest extends TestCase { AMQConnection connection = new AMQConnection(_connectionString, "guest", "guest", "Client", "test"); - Topic destination1 = new AMQTopic("t1"); + Topic destination1 = new AMQTopic(connection, "t1"); TopicSession session1 = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TopicPublisher pub = session1.createPublisher(destination1); connection.close(); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java index bf2cfa3682..a2cd2e4da3 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java @@ -41,9 +41,9 @@ public class JMSDestinationTest extends TestCase public void testJMSDestination() throws Exception
{
- Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
+ AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- Queue queue = new AMQQueue(new AMQShortString("someQ"), new AMQShortString("someQ"), false, true);
+ Queue queue = new AMQQueue(con.getDefaultQueueExchangeName(), new AMQShortString("someQ"), new AMQShortString("someQ"), false, true);
MessageConsumer consumer = consumerSession.createConsumer(queue);
Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java index 0903d0a5ba..dad1666299 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java @@ -51,7 +51,7 @@ public class JMSPropertiesTest extends TestCase public static final String JMS_CORR_ID = "QPIDID_01"; public static final int JMS_DELIV_MODE = 1; public static final String JMS_TYPE = "test.jms.type"; - public static final Destination JMS_REPLY_TO = new AMQQueue("my.replyto"); + protected void setUp() throws Exception { @@ -68,15 +68,15 @@ public class JMSPropertiesTest extends TestCase public void testJMSProperties() throws Exception { - Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test"); + AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test"); AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE); - Queue queue = new AMQQueue(new AMQShortString("someQ"), new AMQShortString("someQ"), false, true); + Queue queue = new AMQQueue(con.getDefaultQueueExchangeName(),new AMQShortString("someQ"), new AMQShortString("someQ"), false, true); MessageConsumer consumer = consumerSession.createConsumer(queue); - Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test"); + AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test"); Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); MessageProducer producer = producerSession.createProducer(queue); - + Destination JMS_REPLY_TO = new AMQQueue(con2,"my.replyto"); //create a test message to send ObjectMessage sentMsg = new NonQpidObjectMessage(); sentMsg.setJMSCorrelationID(JMS_CORR_ID); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java index 8b617093fc..fd425b9930 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java @@ -33,6 +33,7 @@ import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.JMSMapMessage; import org.apache.qpid.client.message.JMSTextMessage; import org.apache.qpid.client.message.MessageConverter; +import org.apache.qpid.exchange.ExchangeDefaults; public class MessageConverterTest extends TestCase @@ -41,7 +42,7 @@ public class MessageConverterTest extends TestCase public static final String JMS_CORR_ID = "QPIDID_01"; public static final int JMS_DELIV_MODE = 1; public static final String JMS_TYPE = "test.jms.type"; - public static final Destination JMS_REPLY_TO = new AMQQueue("my.replyto"); + public static final Destination JMS_REPLY_TO = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME,"my.replyto"); protected JMSTextMessage testTextMessage; diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java index a5eb0384d8..07ef5f04d4 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java @@ -40,6 +40,7 @@ import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.BindingURL; @@ -122,9 +123,9 @@ public class StreamMessageTest extends TestCase public void testModifyReceivedMessageExpandsBuffer() throws Exception { - Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test"); + AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test"); AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE); - AMQQueue queue = new AMQQueue("testQ"); + AMQQueue queue = new AMQQueue(con.getDefaultQueueExchangeName(), new AMQShortString("testQ")); MessageConsumer consumer = consumerSession.createConsumer(queue); consumer.setMessageListener(new MessageListener() { diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java index f957df2c34..0828ab398c 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java @@ -54,8 +54,8 @@ public class DurableSubscriptionTest extends TestCase public void testUnsubscribe() throws AMQException, JMSException, URLSyntaxException { - AMQTopic topic = new AMQTopic("MyTopic"); AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "test"); + AMQTopic topic = new AMQTopic(con,"MyTopic"); Session session1 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE); MessageConsumer consumer1 = session1.createConsumer(topic); MessageProducer producer = session1.createProducer(topic); @@ -95,8 +95,9 @@ public class DurableSubscriptionTest extends TestCase public void testDurability() throws AMQException, JMSException, URLSyntaxException { - AMQTopic topic = new AMQTopic("MyTopic"); + AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "test"); + AMQTopic topic = new AMQTopic(con,"MyTopic"); Session session1 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE); MessageConsumer consumer1 = session1.createConsumer(topic); MessageProducer producer = session1.createProducer(topic); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicPublisherTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicPublisherTest.java index 9aebef71ca..929e2799a9 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicPublisherTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicPublisherTest.java @@ -51,8 +51,9 @@ public class TopicPublisherTest extends TestCase public void testUnidentifiedProducer() throws Exception { - AMQTopic topic = new AMQTopic("MyTopic"); + AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "test"); + AMQTopic topic = new AMQTopic(con,"MyTopic"); TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); TopicPublisher publisher = session1.createPublisher(null); MessageConsumer consumer1 = session1.createConsumer(topic); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java index 481441797f..fe7efb4e88 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java @@ -59,8 +59,9 @@ public class TopicSessionTest extends TestCase public void testTopicSubscriptionUnsubscription() throws Exception { - AMQTopic topic = new AMQTopic("MyTopic"); + AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test"); + AMQTopic topic = new AMQTopic(con.getDefaultTopicExchangeName(),"MyTopic"); TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); TopicSubscriber sub = session1.createDurableSubscriber(topic,"subscription0"); TopicPublisher publisher = session1.createPublisher(topic); @@ -104,9 +105,10 @@ public class TopicSessionTest extends TestCase private void subscriptionNameReuseForDifferentTopic(boolean shutdown) throws Exception { - AMQTopic topic = new AMQTopic("MyTopic1" + String.valueOf(shutdown)); - AMQTopic topic2 = new AMQTopic("MyOtherTopic1" + String.valueOf(shutdown)); AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test"); + AMQTopic topic = new AMQTopic(con,"MyTopic1" + String.valueOf(shutdown)); + AMQTopic topic2 = new AMQTopic(con,"MyOtherTopic1" + String.valueOf(shutdown)); + TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); TopicSubscriber sub = session1.createDurableSubscriber(topic, "subscription0"); TopicPublisher publisher = session1.createPublisher(null); @@ -142,8 +144,9 @@ public class TopicSessionTest extends TestCase public void testUnsubscriptionAfterConnectionClose() throws Exception { - AMQTopic topic = new AMQTopic("MyTopic3"); AMQConnection con1 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test"); + AMQTopic topic = new AMQTopic(con1,"MyTopic3"); + TopicSession session1 = con1.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); TopicPublisher publisher = session1.createPublisher(topic); @@ -171,8 +174,9 @@ public class TopicSessionTest extends TestCase public void testTextMessageCreation() throws Exception { - AMQTopic topic = new AMQTopic("MyTopic4"); + AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test"); + AMQTopic topic = new AMQTopic(con,"MyTopic4"); TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); TopicPublisher publisher = session1.createPublisher(topic); MessageConsumer consumer1 = session1.createConsumer(topic); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java index e291da797c..4296e43f88 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java @@ -25,7 +25,6 @@ import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; -import javax.jms.Session; import javax.jms.TextMessage; import junit.framework.TestCase; @@ -37,6 +36,7 @@ import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.jms.Session; public class TransactedTest extends TestCase { @@ -62,11 +62,13 @@ public class TransactedTest extends TestCase { super.setUp(); TransportConnection.createVMBroker(1); - queue1 = new AMQQueue(new AMQShortString("Q1"), new AMQShortString("Q1"), false, true); - queue2 = new AMQQueue("Q2", false); - con = new AMQConnection("vm://:1", "guest", "guest", "TransactedTest", "test"); session = con.createSession(true, 0); + queue1 = new AMQQueue(session.getDefaultQueueExchangeName(), new AMQShortString("Q1"), new AMQShortString("Q1"), false, true); + queue2 = new AMQQueue(session.getDefaultQueueExchangeName(), new AMQShortString("Q2"), false); + + + consumer1 = session.createConsumer(queue1); //Dummy just to create the queue. MessageConsumer consumer2 = session.createConsumer(queue2); @@ -147,15 +149,15 @@ public class TransactedTest extends TestCase public void testResendsMsgsAfterSessionClose() throws Exception { - Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test"); + AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test"); Session consumerSession = con.createSession(true, Session.CLIENT_ACKNOWLEDGE); - AMQQueue queue3 = new AMQQueue("Q3", false); + AMQQueue queue3 = new AMQQueue(consumerSession.getDefaultQueueExchangeName(),new AMQShortString("Q3"), false); MessageConsumer consumer = consumerSession.createConsumer(queue3); //force synch to ensure the consumer has resulted in a bound queue ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); - Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test"); + AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test"); Session producerSession = con2.createSession(true, Session.CLIENT_ACKNOWLEDGE); MessageProducer producer = producerSession.createProducer(queue3); diff --git a/java/client/src/test/java/org/apache/qpid/testutil/Config.java b/java/client/src/test/java/org/apache/qpid/testutil/Config.java index ad51fe498d..e5b4834622 100644 --- a/java/client/src/test/java/org/apache/qpid/testutil/Config.java +++ b/java/client/src/test/java/org/apache/qpid/testutil/Config.java @@ -27,6 +27,7 @@ import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQHeadersExchange; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.exchange.ExchangeDefaults; public class Config { @@ -117,12 +118,12 @@ public class Config if(isQueue()) { System.out.println("Using queue named " + name); - return new AMQQueue(name); + return new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME,name); } else if(isTopic()) { System.out.println("Using topic named " + name); - return new AMQTopic(name); + return new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME,name); } else if(isHeaders()) { diff --git a/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java b/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java index 172f1b1790..55f9566955 100644 --- a/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java +++ b/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java @@ -24,6 +24,8 @@ import org.apache.qpid.framing.AMQShortString; public class ExchangeDefaults { + public final static AMQShortString DEFAULT_EXCHANGE_NAME = new AMQShortString(""); + public final static AMQShortString TOPIC_EXCHANGE_NAME = new AMQShortString("amq.topic"); public final static AMQShortString TOPIC_EXCHANGE_CLASS = new AMQShortString("topic"); @@ -41,8 +43,4 @@ public class ExchangeDefaults public final static AMQShortString FANOUT_EXCHANGE_CLASS = new AMQShortString("fanout"); - public final static AMQShortString SYSTEM_MANAGEMENT_EXCHANGE_NAME = new AMQShortString("qpid.sysmgmt"); - - public final static AMQShortString SYSTEM_MANAGEMENT_CLASS = new AMQShortString("sysmmgmt"); - } diff --git a/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java b/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java index 2ee4ce21cb..d44fc3cbd5 100644 --- a/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java +++ b/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java @@ -62,7 +62,7 @@ public class AMQBindingURL implements BindingURL if (exchangeClass == null) { _url = ExchangeDefaults.DIRECT_EXCHANGE_CLASS + "://" + - ExchangeDefaults.DIRECT_EXCHANGE_NAME + "//" + _url; + ExchangeDefaults.DEFAULT_EXCHANGE_NAME + "//" + _url; //URLHelper.parseError(-1, "Exchange Class not specified.", _url); parseBindingURL(); return; @@ -76,7 +76,14 @@ public class AMQBindingURL implements BindingURL if (exchangeName == null) { - throw URLHelper.parseError(-1, "Exchange Name not specified.", _url); + if(getExchangeClass().equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS)) + { + setExchangeName(ExchangeDefaults.DEFAULT_EXCHANGE_NAME); + } + else + { + throw URLHelper.parseError(-1, "Exchange Name not specified.", _url); + } } else { @@ -172,6 +179,11 @@ public class AMQBindingURL implements BindingURL { _exchangeClass = exchangeClass; + if (exchangeClass.equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS)) + { + setOption(BindingURL.OPTION_EXCLUSIVE, "true"); + } + } public AMQShortString getExchangeName() @@ -182,11 +194,6 @@ public class AMQBindingURL implements BindingURL private void setExchangeName(AMQShortString name) { _exchangeName = name; - - if (name.equals(ExchangeDefaults.TOPIC_EXCHANGE_NAME)) - { - setOption(BindingURL.OPTION_EXCLUSIVE, "true"); - } } public AMQShortString getDestinationName() diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java index ab795d0459..78ab7c4c73 100644 --- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java +++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java @@ -35,6 +35,7 @@ import org.apache.qpid.client.AMQTopic; import org.apache.qpid.jms.ConnectionListener;
import org.apache.qpid.jms.Session;
import org.apache.qpid.topic.Config;
+import org.apache.qpid.exchange.ExchangeDefaults;
/**
* PingPongBouncer is a message listener the bounces back messages to their reply to destination. This is used to return
@@ -414,11 +415,11 @@ public class PingPongBouncer implements MessageListener {
if (isPubSub())
{
- _consumerDestination = new AMQTopic(name);
+ _consumerDestination = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, name);
}
else
{
- _consumerDestination = new AMQQueue(name);
+ _consumerDestination = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, name);
}
}
diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java index 8def95f7b1..57d5c37fc6 100644 --- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java +++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java @@ -43,6 +43,7 @@ import org.apache.qpid.client.message.TestMessageFactory; import org.apache.qpid.jms.MessageProducer;
import org.apache.qpid.jms.Session;
import org.apache.qpid.topic.Config;
+import org.apache.qpid.exchange.ExchangeDefaults;
import uk.co.thebadgerset.junit.extensions.BatchedThrottle;
import uk.co.thebadgerset.junit.extensions.Throttle;
@@ -704,13 +705,13 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis if (_isPubSub)
{
_logger.debug("Creating topics.");
- destination = new AMQTopic(rootName + id);
+ destination = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id);
}
// Otherwise this is a p2p pinger, in which case create queues.
else
{
_logger.debug("Creating queues.");
- destination = new AMQQueue(rootName + id);
+ destination = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, rootName + id);
}
// Keep the destination.
diff --git a/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java b/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java index 76a0690b8c..54f5a0f660 100644 --- a/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java +++ b/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java @@ -31,6 +31,7 @@ import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.exchange.ExchangeDefaults; /** * This class has not kept up to date with the topic_listener in the cpp tests. It should provide identical behaviour for @@ -97,9 +98,9 @@ public class Listener implements MessageListener if (_session instanceof AMQSession) { - _topic = new AMQTopic(CONTROL_TOPIC); + _topic = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, CONTROL_TOPIC); //_control = new AMQTopic(CONTROL_TOPIC); - _response = new AMQQueue(RESPONSE_QUEUE); + _response = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, RESPONSE_QUEUE); } else { diff --git a/java/perftests/src/main/java/org/apache/qpid/topic/MessageFactory.java b/java/perftests/src/main/java/org/apache/qpid/topic/MessageFactory.java index 8b87f76c3e..4efdc1cb56 100644 --- a/java/perftests/src/main/java/org/apache/qpid/topic/MessageFactory.java +++ b/java/perftests/src/main/java/org/apache/qpid/topic/MessageFactory.java @@ -24,6 +24,7 @@ import javax.jms.*; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.exchange.ExchangeDefaults; /** */ @@ -46,8 +47,8 @@ class MessageFactory _session = session; if (session instanceof AMQSession) { - _topic = new AMQTopic("topic_control"); - _control = new AMQTopic("topictest.control"); + _topic = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, "topic_control"); + _control = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, "topictest.control"); } else { |