summaryrefslogtreecommitdiff
path: root/java/client/src/main
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2007-02-16 23:11:41 +0000
committerRobert Godfrey <rgodfrey@apache.org>2007-02-16 23:11:41 +0000
commitdd8df96fcca8f5f9dcbe91ba012cff400a38daa7 (patch)
treeee84d98ec82abd31dd486f98fea1cb6bdb526db5 /java/client/src/main
parent6213309b7c179fdddfeca0273d5c1f6592adedd7 (diff)
downloadqpid-python-dd8df96fcca8f5f9dcbe91ba012cff400a38daa7.tar.gz
QPID-375 : remove assumptions on standard exchanges (amq.direct, amq.topic, etc), allow other exchanges to be created through virtualhosts.xml
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@508649 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src/main')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java147
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java67
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQDestination.java8
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQQueue.java49
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java61
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQTopic.java29
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java14
-rw-r--r--java/client/src/main/java/org/apache/qpid/jms/Session.java10
-rw-r--r--java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java9
12 files changed, 305 insertions, 100 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index ebaa22ce44..2030876952 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -20,37 +20,6 @@
*/
package org.apache.qpid.client;
-import java.io.IOException;
-import java.net.ConnectException;
-import java.nio.channels.UnresolvedAddressException;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.jms.ConnectionConsumer;
-import javax.jms.ConnectionMetaData;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.IllegalStateException;
-import javax.jms.JMSException;
-import javax.jms.Queue;
-import javax.jms.QueueConnection;
-import javax.jms.QueueSession;
-import javax.jms.ServerSessionPool;
-import javax.jms.Session;
-import javax.jms.Topic;
-import javax.jms.TopicConnection;
-import javax.jms.TopicSession;
-import javax.naming.NamingException;
-import javax.naming.Reference;
-import javax.naming.Referenceable;
-import javax.naming.StringRefAddr;
-
-import org.apache.log4j.Logger;
import org.apache.qpid.AMQConnectionFailureException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUndeliveredException;
@@ -59,6 +28,8 @@ import org.apache.qpid.client.failover.FailoverSupport;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicQosBody;
import org.apache.qpid.framing.BasicQosOkBody;
import org.apache.qpid.framing.ChannelOpenBody;
@@ -73,6 +44,25 @@ import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.jms.FailoverPolicy;
import org.apache.qpid.url.URLSyntaxException;
+import org.apache.log4j.Logger;
+
+import javax.jms.*;
+import javax.jms.IllegalStateException;
+import javax.naming.NamingException;
+import javax.naming.Reference;
+import javax.naming.Referenceable;
+import javax.naming.StringRefAddr;
+import java.io.IOException;
+import java.net.ConnectException;
+import java.nio.channels.UnresolvedAddressException;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable
{
private static final Logger _logger = Logger.getLogger(AMQConnection.class);
@@ -157,12 +147,17 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
* The connection meta data
*/
private QpidConnectionMetaData _connectionMetaData;
-
+
/**
* Configuration info for SSL
*/
private SSLConfiguration _sslConfiguration;
+ private AMQShortString _defaultTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
+ private AMQShortString _defaultQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
+ private AMQShortString _temporaryTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
+ private AMQShortString _temporaryQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
+
/**
* @param broker brokerdetails
* @param username username
@@ -180,7 +175,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
(clientName == null ? "" : clientName) + "/" +
virtualHost + "?brokerlist='" + AMQBrokerDetails.checkTransport(broker) + "'"), null);
}
-
+
/**
* @param broker brokerdetails
* @param username username
@@ -198,20 +193,20 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
(clientName == null ? "" : clientName) + "/" +
virtualHost + "?brokerlist='" + AMQBrokerDetails.checkTransport(broker) + "'"), sslConfig);
}
-
+
public AMQConnection(String host, int port, String username, String password,
String clientName, String virtualHost) throws AMQException, URLSyntaxException
{
this(host, port, false, username, password, clientName, virtualHost, null);
}
-
+
public AMQConnection(String host, int port, String username, String password,
- String clientName, String virtualHost, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException
+ String clientName, String virtualHost, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException
{
- this(host, port, false, username, password, clientName, virtualHost, sslConfig);
+ this(host, port, false, username, password, clientName, virtualHost, sslConfig);
}
-
+
public AMQConnection(String host, int port, boolean useSSL, String username, String password,
String clientName, String virtualHost, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException
@@ -234,12 +229,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
this(new AMQConnectionURL(connection), null);
}
-
+
public AMQConnection(String connection, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException
{
this(new AMQConnectionURL(connection), sslConfig);
}
-
+
public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException
{
@@ -257,6 +252,28 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
_password = connectionURL.getPassword();
setVirtualHost(connectionURL.getVirtualHost());
+
+ if (connectionURL.getDefaultQueueExchangeName() != null)
+ {
+ _defaultQueueExchangeName = connectionURL.getDefaultQueueExchangeName();
+ }
+
+ if (connectionURL.getDefaultTopicExchangeName() != null)
+ {
+ _defaultTopicExchangeName = connectionURL.getDefaultTopicExchangeName();
+ }
+
+ if (connectionURL.getTemporaryQueueExchangeName() != null)
+ {
+ _temporaryQueueExchangeName = connectionURL.getTemporaryQueueExchangeName();
+ }
+
+ if (connectionURL.getTemporaryTopicExchangeName() != null)
+ {
+ _temporaryTopicExchangeName = connectionURL.getTemporaryTopicExchangeName();
+ }
+
+
_failoverPolicy = new FailoverPolicy(connectionURL);
_protocolHandler = new AMQProtocolHandler(this);
@@ -440,7 +457,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
}
- public Session createSession(final boolean transacted, final int acknowledgeMode) throws JMSException
+ public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode) throws JMSException
{
return createSession(transacted, acknowledgeMode, AMQSession.DEFAULT_PREFETCH_HIGH_MARK);
}
@@ -1070,9 +1087,53 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
AMQConnectionFactory.class.getName(),
null); // factory location
}
-
+
public SSLConfiguration getSSLConfiguration()
{
- return _sslConfiguration;
+ return _sslConfiguration;
+ }
+
+ public AMQShortString getDefaultTopicExchangeName()
+ {
+ return _defaultTopicExchangeName;
+ }
+
+
+ public void setDefaultTopicExchangeName(AMQShortString defaultTopicExchangeName)
+ {
+ _defaultTopicExchangeName = defaultTopicExchangeName;
+ }
+
+
+ public AMQShortString getDefaultQueueExchangeName()
+ {
+ return _defaultQueueExchangeName;
+ }
+
+
+ public void setDefaultQueueExchangeName(AMQShortString defaultQueueExchangeName)
+ {
+ _defaultQueueExchangeName = defaultQueueExchangeName;
+ }
+
+ public AMQShortString getTemporaryTopicExchangeName()
+ {
+ return _temporaryTopicExchangeName;
+ }
+
+ public AMQShortString getTemporaryQueueExchangeName()
+ {
+ return _temporaryQueueExchangeName; //To change body of created methods use File | Settings | File Templates.
+ }
+
+
+ public void setTemporaryTopicExchangeName(AMQShortString temporaryTopicExchangeName)
+ {
+ _temporaryTopicExchangeName = temporaryTopicExchangeName;
+ }
+
+ public void setTemporaryQueueExchangeName(AMQShortString temporaryQueueExchangeName)
+ {
+ _temporaryQueueExchangeName = temporaryQueueExchangeName;
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
index fea83d3128..0dcc544ea8 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
@@ -20,6 +20,12 @@
*/
package org.apache.qpid.client;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.jms.BrokerDetails;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.url.URLHelper;
+import org.apache.qpid.url.URLSyntaxException;
+
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
@@ -27,11 +33,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.StringTokenizer;
-import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.jms.ConnectionURL;
-import org.apache.qpid.url.URLHelper;
-import org.apache.qpid.url.URLSyntaxException;
-
public class AMQConnectionURL implements ConnectionURL
{
private String _url;
@@ -43,6 +44,11 @@ public class AMQConnectionURL implements ConnectionURL
private String _username;
private String _password;
private String _virtualHost;
+ private AMQShortString _defaultQueueExchangeName;
+ private AMQShortString _defaultTopicExchangeName;
+ private AMQShortString _temporaryTopicExchangeName;
+ private AMQShortString _temporaryQueueExchangeName;
+
public AMQConnectionURL(String fullURL) throws URLSyntaxException
{
@@ -107,7 +113,7 @@ public class AMQConnectionURL implements ConnectionURL
if (userInfo == null)
{
throw URLHelper.parseError(AMQ_PROTOCOL.length() + 3,
- "User information not found on url", fullURL);
+ "User information not found on url", fullURL);
}
else
{
@@ -161,7 +167,9 @@ public class AMQConnectionURL implements ConnectionURL
{
if (slash != 0 && fullURL.charAt(slash - 1) == ':')
{
- throw URLHelper.parseError(slash - 2, fullURL.indexOf('?') - slash + 2, "Virtual host looks like a windows path, forward slash not allowed in URL", fullURL);
+ throw URLHelper.parseError(slash - 2, fullURL.indexOf('?') - slash + 2,
+ "Virtual host looks like a windows path, forward slash not allowed in URL",
+ fullURL);
}
else
{
@@ -181,7 +189,7 @@ public class AMQConnectionURL implements ConnectionURL
if (colonIndex == -1)
{
throw URLHelper.parseError(AMQ_PROTOCOL.length() + 3, userinfo.length(),
- "Null password in user information not allowed.", _url);
+ "Null password in user information not allowed.", _url);
}
else
{
@@ -230,6 +238,29 @@ public class AMQConnectionURL implements ConnectionURL
_options.remove(OPTIONS_FAILOVER);
}
+
+ if (_options.containsKey(OPTIONS_DEFAULT_TOPIC_EXCHANGE))
+ {
+ _defaultTopicExchangeName = new AMQShortString(_options.get(OPTIONS_DEFAULT_TOPIC_EXCHANGE));
+ }
+
+
+ if (_options.containsKey(OPTIONS_DEFAULT_QUEUE_EXCHANGE))
+ {
+ _defaultQueueExchangeName = new AMQShortString(_options.get(OPTIONS_DEFAULT_QUEUE_EXCHANGE));
+ }
+
+
+ if (_options.containsKey(OPTIONS_TEMPORARY_QUEUE_EXCHANGE))
+ {
+ _temporaryQueueExchangeName = new AMQShortString(_options.get(OPTIONS_TEMPORARY_QUEUE_EXCHANGE));
+ }
+
+
+ if (_options.containsKey(OPTIONS_TEMPORARY_TOPIC_EXCHANGE))
+ {
+ _temporaryTopicExchangeName = new AMQShortString(_options.get(OPTIONS_TEMPORARY_TOPIC_EXCHANGE));
+ }
}
public String getURL()
@@ -332,6 +363,26 @@ public class AMQConnectionURL implements ConnectionURL
_options.put(key, value);
}
+ public AMQShortString getDefaultQueueExchangeName()
+ {
+ return _defaultQueueExchangeName;
+ }
+
+ public AMQShortString getDefaultTopicExchangeName()
+ {
+ return _defaultTopicExchangeName;
+ }
+
+ public AMQShortString getTemporaryQueueExchangeName()
+ {
+ return _temporaryQueueExchangeName;
+ }
+
+ public AMQShortString getTemporaryTopicExchangeName()
+ {
+ return _temporaryTopicExchangeName;
+ }
+
public String toString()
{
StringBuffer sb = new StringBuffer();
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
index a994dbc670..661372845a 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
@@ -145,12 +145,12 @@ public abstract class AMQDestination implements Destination, Referenceable
public boolean isTopic()
{
- return ExchangeDefaults.TOPIC_EXCHANGE_NAME.equals(_exchangeName);
+ return ExchangeDefaults.TOPIC_EXCHANGE_CLASS.equals(_exchangeClass);
}
public boolean isQueue()
{
- return ExchangeDefaults.DIRECT_EXCHANGE_NAME.equals(_exchangeName);
+ return ExchangeDefaults.DIRECT_EXCHANGE_CLASS.equals(_exchangeClass);
}
public AMQShortString getDestinationName()
@@ -411,11 +411,11 @@ public abstract class AMQDestination implements Destination, Referenceable
if (exchangeClass.equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS))
{
- return new AMQQueue(destinationName,queueName,isExclusive,isAutoDelete,isDurable);
+ return new AMQQueue(exchangeName,destinationName,queueName,isExclusive,isAutoDelete,isDurable);
}
else if (exchangeClass.equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS))
{
- return new AMQTopic(destinationName,isAutoDelete,queueName,isDurable);
+ return new AMQTopic(exchangeName,destinationName,isAutoDelete,queueName,isDurable);
}
else if (exchangeClass.equals(ExchangeDefaults.HEADERS_EXCHANGE_CLASS))
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java b/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java
index 44328e3555..9185bc87e8 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java
@@ -21,6 +21,7 @@
package org.apache.qpid.client;
import javax.jms.Queue;
+import javax.jms.Connection;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
@@ -43,9 +44,19 @@ public class AMQQueue extends AMQDestination implements Queue
* Create a reference to a non temporary queue. Note this does not actually imply the queue exists.
* @param name the name of the queue
*/
- public AMQQueue(AMQShortString name)
+ public AMQQueue(AMQShortString exchangeName, String name)
{
- this(name, false);
+ this(exchangeName, new AMQShortString(name));
+ }
+
+
+ /**
+ * Create a reference to a non temporary queue. Note this does not actually imply the queue exists.
+ * @param name the name of the queue
+ */
+ public AMQQueue(AMQShortString exchangeName, AMQShortString name)
+ {
+ this(exchangeName, name, false);
}
public AMQQueue(AMQShortString exchangeName, AMQShortString routingKey, AMQShortString queueName)
@@ -58,9 +69,20 @@ public class AMQQueue extends AMQDestination implements Queue
* Create a reference to a non temporary queue. Note this does not actually imply the queue exists.
* @param name the name of the queue
*/
- public AMQQueue(String name)
+ public AMQQueue(String exchangeName, String name)
{
- this(new AMQShortString(name), false);
+ this(new AMQShortString(exchangeName), new AMQShortString(name), false);
+ }
+
+
+ public AMQQueue(AMQConnection connection, String name)
+ {
+ this(connection.getDefaultQueueExchangeName(),name);
+ }
+
+ public AMQQueue(AMQConnection connection, String name, boolean temporary)
+ {
+ this(connection.getDefaultQueueExchangeName(), new AMQShortString(name),temporary);
}
@@ -71,9 +93,9 @@ public class AMQQueue extends AMQDestination implements Queue
* @param temporary if true the broker will generate a queue name, also if true then the queue is autodeleted
* and exclusive
*/
- public AMQQueue(String name, boolean temporary)
+ public AMQQueue(String exchangeName, String name, boolean temporary)
{
- this(new AMQShortString(name),temporary);
+ this(new AMQShortString(exchangeName), new AMQShortString(name),temporary);
}
@@ -84,11 +106,11 @@ public class AMQQueue extends AMQDestination implements Queue
* @param temporary if true the broker will generate a queue name, also if true then the queue is autodeleted
* and exclusive
*/
- public AMQQueue(AMQShortString name, boolean temporary)
+ public AMQQueue(AMQShortString exchangeName, AMQShortString name, boolean temporary)
{
// queue name is set to null indicating that the broker assigns a name in the case of temporary queues
// temporary queues are typically used as response queues
- this(name, temporary?null:name, temporary, temporary, !temporary);
+ this(exchangeName, name, temporary?null:name, temporary, temporary, !temporary);
}
@@ -99,19 +121,20 @@ public class AMQQueue extends AMQDestination implements Queue
* @param exclusive true if the queue should only permit a single consumer
* @param autoDelete true if the queue should be deleted automatically when the last consumers detaches
*/
- public AMQQueue(AMQShortString destinationName, AMQShortString queueName, boolean exclusive, boolean autoDelete)
+ public AMQQueue(AMQShortString exchangeName, AMQShortString destinationName, AMQShortString queueName, boolean exclusive, boolean autoDelete)
{
- this(destinationName, queueName, exclusive, autoDelete, false);
+ this(exchangeName, destinationName, queueName, exclusive, autoDelete, false);
}
- public AMQQueue(AMQShortString destinationName, AMQShortString queueName, boolean exclusive, boolean autoDelete, boolean durable)
+ public AMQQueue(AMQShortString exchangeName, AMQShortString destinationName, AMQShortString queueName, boolean exclusive, boolean autoDelete, boolean durable)
{
- super(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, destinationName, exclusive,
+ super(exchangeName, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, destinationName, exclusive,
autoDelete, queueName, durable);
}
-
+
+
public AMQShortString getRoutingKey()
{
return getAMQQueueName();
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 3973b5dd71..7ab26f3b47 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -1312,7 +1312,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
checkNotClosed();
if (queueName.indexOf('/') == -1)
{
- return new AMQQueue(queueName);
+ return new AMQQueue(getDefaultQueueExchangeName(), new AMQShortString(queueName));
}
else
{
@@ -1330,6 +1330,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
+ public AMQShortString getDefaultQueueExchangeName()
+ {
+ return _connection.getDefaultQueueExchangeName();
+ }
+
/**
* Creates a QueueReceiver wrapping a MessageConsumer
*
@@ -1379,7 +1384,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
if (topicName.indexOf('/') == -1)
{
- return new AMQTopic(new AMQShortString(topicName));
+ return new AMQTopic(getDefaultTopicExchangeName(),new AMQShortString(topicName));
}
else
{
@@ -1397,6 +1402,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
+ public AMQShortString getDefaultTopicExchangeName()
+ {
+ return _connection.getDefaultTopicExchangeName();
+ }
+
/**
* Creates a non-durable subscriber
*
@@ -1409,8 +1419,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public TopicSubscriber createSubscriber(Topic topic) throws JMSException
{
checkNotClosed();
- checkValidTopic(topic);
- AMQTopic dest = new AMQTopic(topic.getTopicName());
+ AMQTopic dest = checkValidTopic(topic);
+ //AMQTopic dest = new AMQTopic(topic.getTopicName());
return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
}
@@ -1428,16 +1438,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException
{
checkNotClosed();
- checkValidTopic(topic);
- AMQTopic dest = new AMQTopic(topic.getTopicName());
+ AMQTopic dest = checkValidTopic(topic);
+ //AMQTopic dest = new AMQTopic(topic.getTopicName());
return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal));
}
public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
{
checkNotClosed();
- checkValidTopic(topic);
- AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection);
+ AMQTopic origTopic = checkValidTopic(topic);
+ AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection);
TopicSubscriberAdaptor subscriber = _subscriptions.get(name);
if (subscriber != null)
{
@@ -1464,8 +1474,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
// if the queue is bound to the exchange but NOT for this topic, then the JMS spec
// says we must trash the subscription.
- if (isQueueBound(dest.getAMQQueueName()) &&
- !isQueueBound(dest.getAMQQueueName(), topicName))
+ if (isQueueBound(dest.getExchangeName(),dest.getAMQQueueName()) &&
+ !isQueueBound(dest.getExchangeName(),dest.getAMQQueueName(), topicName))
{
deleteQueue(dest.getAMQQueueName());
}
@@ -1556,7 +1566,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
else
{
- if (isQueueBound(AMQTopic.getDurableTopicQueueName(name, _connection)))
+ if (isQueueBound(getDefaultTopicExchangeName(), AMQTopic.getDurableTopicQueueName(name, _connection)))
{
deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection));
}
@@ -1567,17 +1577,17 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
- boolean isQueueBound(AMQShortString queueName) throws JMSException
+ boolean isQueueBound(AMQShortString exchangeName, AMQShortString queueName) throws JMSException
{
- return isQueueBound(queueName, null);
+ return isQueueBound(exchangeName, queueName, null);
}
- boolean isQueueBound(AMQShortString queueName, AMQShortString routingKey) throws JMSException
+ boolean isQueueBound(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey) throws JMSException
{
// TODO: Be aware of possible changes to parameter order as versions change.
AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId,
getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
- ExchangeDefaults.TOPIC_EXCHANGE_NAME, // exchange
+ exchangeName, // exchange
queueName, // queue
routingKey); // routingKey
AMQMethodEvent response = null;
@@ -1858,7 +1868,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
/*
* I could have combined the last 3 methods, but this way it improves readability
*/
- private void checkValidTopic(Topic topic) throws JMSException
+ private AMQTopic checkValidTopic(Topic topic) throws JMSException
{
if (topic == null)
{
@@ -1866,8 +1876,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
if ((topic instanceof TemporaryDestination) && ((TemporaryDestination) topic).getSession() != this)
{
- throw new JMSException("Cannot create a subscription on a temporary topic created in another session");
+ throw new javax.jms.InvalidDestinationException("Cannot create a subscription on a temporary topic created in another session");
}
+ if(!(topic instanceof AMQTopic))
+ {
+ throw new javax.jms.InvalidDestinationException("Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: " + topic.getClass().getName());
+ }
+ return (AMQTopic) topic;
}
private void checkValidQueue(Queue queue) throws InvalidDestinationException
@@ -1887,6 +1902,18 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
+ public AMQShortString getTemporaryTopicExchangeName()
+ {
+ return _connection.getTemporaryTopicExchangeName();
+ }
+
+ public AMQShortString getTemporaryQueueExchangeName()
+ {
+ return _connection.getTemporaryQueueExchangeName();
+ }
+
+
+
public int getTicket()
{
return _ticket;
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java b/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java
index c350eb0c45..ce8e14506f 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java
@@ -40,7 +40,7 @@ final class AMQTemporaryQueue extends AMQQueue implements TemporaryQueue, Tempor
*/
public AMQTemporaryQueue(AMQSession session)
{
- super(new AMQShortString("TempQueue" + Long.toString(System.currentTimeMillis())), true);
+ super(session.getTemporaryQueueExchangeName(),new AMQShortString("TempQueue" + Long.toString(System.currentTimeMillis())), true);
_session = session;
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java b/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java
index 241a9abc9b..6c954ec3df 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.client;
+import org.apache.qpid.framing.AMQShortString;
+
import javax.jms.JMSException;
import javax.jms.TemporaryTopic;
@@ -36,7 +38,7 @@ class AMQTemporaryTopic extends AMQTopic implements TemporaryTopic, TemporaryDes
*/
public AMQTemporaryTopic(AMQSession session)
{
- super("TempQueue" + Long.toString(System.currentTimeMillis()));
+ super(session.getTemporaryTopicExchangeName(),new AMQShortString("TempQueue" + Long.toString(System.currentTimeMillis())));
_session = session;
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
index 7d84ec6470..319e728edf 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
@@ -39,32 +39,43 @@ public class AMQTopic extends AMQDestination implements Topic
super(binding);
}
- public AMQTopic(String name)
- {
- this(new AMQShortString(name));
- }
+// public AMQTopic(String exchangeName, String routingKey)
+// {
+// this(new AMQShortString(exchangeName), new AMQShortString(routingKey));
+// }
public AMQTopic(AMQShortString exchange, AMQShortString routingKey, AMQShortString queueName)
{
super(exchange, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, routingKey, true, true, queueName, false);
}
+ public AMQTopic(AMQConnection conn, String routingKey)
+ {
+ this(conn.getDefaultTopicExchangeName(), new AMQShortString(routingKey));
+ }
+
+
+ public AMQTopic(AMQShortString exchangeName, String routingKey)
+ {
+ this(exchangeName, new AMQShortString(routingKey));
+ }
- public AMQTopic(AMQShortString name)
+ public AMQTopic(AMQShortString exchangeName, AMQShortString routingKey)
{
- this(name, true, null, false);
+ this(exchangeName, routingKey, null);
}
- public AMQTopic(AMQShortString name, boolean isAutoDelete, AMQShortString queueName, boolean isDurable)
+ public AMQTopic(AMQShortString exchangeName, AMQShortString name, boolean isAutoDelete, AMQShortString queueName, boolean isDurable)
{
- super(ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, name, true, isAutoDelete,
+ super(exchangeName, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, name, true, isAutoDelete,
queueName, isDurable);
}
public static AMQTopic createDurableTopic(AMQTopic topic, String subscriptionName, AMQConnection connection)
throws JMSException
{
- return new AMQTopic(topic.getDestinationName(), false, getDurableTopicQueueName(subscriptionName, connection),
+ return new AMQTopic(topic.getExchangeName(), topic.getDestinationName(), false,
+ getDurableTopicQueueName(subscriptionName, connection),
true);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
index 3f8c1f65f8..19382b58c3 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
@@ -380,4 +380,9 @@ public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessag
}
}
+ public String toString()
+ {
+ return String.valueOf(System.identityHashCode(this));
+ }
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java b/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
index caadb0f621..2d91e290c4 100644
--- a/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
+++ b/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.jms;
+import org.apache.qpid.framing.AMQShortString;
+
import java.util.List;
/**
@@ -35,6 +37,10 @@ public interface ConnectionURL
public static final String OPTIONS_FAILOVER = "failover";
public static final String OPTIONS_FAILOVER_CYCLE = "cyclecount";
public static final String OPTIONS_SSL = "ssl";
+ public static final String OPTIONS_DEFAULT_TOPIC_EXCHANGE = "defaultTopicExchange";
+ public static final String OPTIONS_DEFAULT_QUEUE_EXCHANGE = "defaultQueueExchange";
+ public static final String OPTIONS_TEMPORARY_TOPIC_EXCHANGE = "temporaryTopicExchange";
+ public static final String OPTIONS_TEMPORARY_QUEUE_EXCHANGE = "temporaryQueueExchange";
String getURL();
@@ -69,4 +75,12 @@ public interface ConnectionURL
String getOption(String key);
void setOption(String key, String value);
+
+ AMQShortString getDefaultQueueExchangeName();
+
+ AMQShortString getDefaultTopicExchangeName();
+
+ AMQShortString getTemporaryQueueExchangeName();
+
+ AMQShortString getTemporaryTopicExchangeName();
}
diff --git a/java/client/src/main/java/org/apache/qpid/jms/Session.java b/java/client/src/main/java/org/apache/qpid/jms/Session.java
index 025aef66c8..5287381fae 100644
--- a/java/client/src/main/java/org/apache/qpid/jms/Session.java
+++ b/java/client/src/main/java/org/apache/qpid/jms/Session.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.jms;
+import org.apache.qpid.framing.AMQShortString;
+
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
@@ -88,4 +90,12 @@ public interface Session extends javax.jms.Session
*/
MessageProducer createProducer(Destination destination, boolean immediate)
throws JMSException;
+
+ AMQShortString getTemporaryTopicExchangeName();
+
+ AMQShortString getDefaultQueueExchangeName();
+
+ AMQShortString getDefaultTopicExchangeName();
+
+ AMQShortString getTemporaryQueueExchangeName();
}
diff --git a/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java b/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
index 7a76aa0002..4f6f1561b6 100644
--- a/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
+++ b/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
@@ -47,6 +47,7 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.BindingURL;
import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.exchange.ExchangeDefaults;
public class PropertiesFileInitialContextFactory implements InitialContextFactory
{
@@ -236,12 +237,12 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor
{
if(value instanceof AMQShortString)
{
- return new AMQQueue((AMQShortString) value);
+ return new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, (AMQShortString) value);
}
else if (value instanceof String)
{
- return new AMQQueue(new AMQShortString((String) value));
+ return new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, new AMQShortString((String) value));
}
else if (value instanceof BindingURL)
@@ -259,11 +260,11 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor
{
if(value instanceof AMQShortString)
{
- return new AMQTopic((AMQShortString)value);
+ return new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, (AMQShortString)value);
}
else if (value instanceof String)
{
- return new AMQTopic(new AMQShortString((String) value));
+ return new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, new AMQShortString((String) value));
}
else if (value instanceof BindingURL)