diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2007-02-16 23:11:41 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2007-02-16 23:11:41 +0000 |
| commit | dd8df96fcca8f5f9dcbe91ba012cff400a38daa7 (patch) | |
| tree | ee84d98ec82abd31dd486f98fea1cb6bdb526db5 /java/client/src/main | |
| parent | 6213309b7c179fdddfeca0273d5c1f6592adedd7 (diff) | |
| download | qpid-python-dd8df96fcca8f5f9dcbe91ba012cff400a38daa7.tar.gz | |
QPID-375 : remove assumptions on standard exchanges (amq.direct, amq.topic, etc), allow other exchanges to be created through virtualhosts.xml
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@508649 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src/main')
12 files changed, 305 insertions, 100 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index ebaa22ce44..2030876952 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -20,37 +20,6 @@ */ package org.apache.qpid.client; -import java.io.IOException; -import java.net.ConnectException; -import java.nio.channels.UnresolvedAddressException; -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; - -import javax.jms.ConnectionConsumer; -import javax.jms.ConnectionMetaData; -import javax.jms.Destination; -import javax.jms.ExceptionListener; -import javax.jms.IllegalStateException; -import javax.jms.JMSException; -import javax.jms.Queue; -import javax.jms.QueueConnection; -import javax.jms.QueueSession; -import javax.jms.ServerSessionPool; -import javax.jms.Session; -import javax.jms.Topic; -import javax.jms.TopicConnection; -import javax.jms.TopicSession; -import javax.naming.NamingException; -import javax.naming.Reference; -import javax.naming.Referenceable; -import javax.naming.StringRefAddr; - -import org.apache.log4j.Logger; import org.apache.qpid.AMQConnectionFailureException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQUndeliveredException; @@ -59,6 +28,8 @@ import org.apache.qpid.client.failover.FailoverSupport; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicQosBody; import org.apache.qpid.framing.BasicQosOkBody; import org.apache.qpid.framing.ChannelOpenBody; @@ -73,6 +44,25 @@ import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.jms.FailoverPolicy; import org.apache.qpid.url.URLSyntaxException; +import org.apache.log4j.Logger; + +import javax.jms.*; +import javax.jms.IllegalStateException; +import javax.naming.NamingException; +import javax.naming.Reference; +import javax.naming.Referenceable; +import javax.naming.StringRefAddr; +import java.io.IOException; +import java.net.ConnectException; +import java.nio.channels.UnresolvedAddressException; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable { private static final Logger _logger = Logger.getLogger(AMQConnection.class); @@ -157,12 +147,17 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect * The connection meta data */ private QpidConnectionMetaData _connectionMetaData; - + /** * Configuration info for SSL */ private SSLConfiguration _sslConfiguration; + private AMQShortString _defaultTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME; + private AMQShortString _defaultQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME; + private AMQShortString _temporaryTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME; + private AMQShortString _temporaryQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME; + /** * @param broker brokerdetails * @param username username @@ -180,7 +175,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect (clientName == null ? "" : clientName) + "/" + virtualHost + "?brokerlist='" + AMQBrokerDetails.checkTransport(broker) + "'"), null); } - + /** * @param broker brokerdetails * @param username username @@ -198,20 +193,20 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect (clientName == null ? "" : clientName) + "/" + virtualHost + "?brokerlist='" + AMQBrokerDetails.checkTransport(broker) + "'"), sslConfig); } - + public AMQConnection(String host, int port, String username, String password, String clientName, String virtualHost) throws AMQException, URLSyntaxException { this(host, port, false, username, password, clientName, virtualHost, null); } - + public AMQConnection(String host, int port, String username, String password, - String clientName, String virtualHost, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException + String clientName, String virtualHost, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException { - this(host, port, false, username, password, clientName, virtualHost, sslConfig); + this(host, port, false, username, password, clientName, virtualHost, sslConfig); } - + public AMQConnection(String host, int port, boolean useSSL, String username, String password, String clientName, String virtualHost, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException @@ -234,12 +229,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { this(new AMQConnectionURL(connection), null); } - + public AMQConnection(String connection, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException { this(new AMQConnectionURL(connection), sslConfig); } - + public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException { @@ -257,6 +252,28 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _password = connectionURL.getPassword(); setVirtualHost(connectionURL.getVirtualHost()); + + if (connectionURL.getDefaultQueueExchangeName() != null) + { + _defaultQueueExchangeName = connectionURL.getDefaultQueueExchangeName(); + } + + if (connectionURL.getDefaultTopicExchangeName() != null) + { + _defaultTopicExchangeName = connectionURL.getDefaultTopicExchangeName(); + } + + if (connectionURL.getTemporaryQueueExchangeName() != null) + { + _temporaryQueueExchangeName = connectionURL.getTemporaryQueueExchangeName(); + } + + if (connectionURL.getTemporaryTopicExchangeName() != null) + { + _temporaryTopicExchangeName = connectionURL.getTemporaryTopicExchangeName(); + } + + _failoverPolicy = new FailoverPolicy(connectionURL); _protocolHandler = new AMQProtocolHandler(this); @@ -440,7 +457,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } - public Session createSession(final boolean transacted, final int acknowledgeMode) throws JMSException + public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode) throws JMSException { return createSession(transacted, acknowledgeMode, AMQSession.DEFAULT_PREFETCH_HIGH_MARK); } @@ -1070,9 +1087,53 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect AMQConnectionFactory.class.getName(), null); // factory location } - + public SSLConfiguration getSSLConfiguration() { - return _sslConfiguration; + return _sslConfiguration; + } + + public AMQShortString getDefaultTopicExchangeName() + { + return _defaultTopicExchangeName; + } + + + public void setDefaultTopicExchangeName(AMQShortString defaultTopicExchangeName) + { + _defaultTopicExchangeName = defaultTopicExchangeName; + } + + + public AMQShortString getDefaultQueueExchangeName() + { + return _defaultQueueExchangeName; + } + + + public void setDefaultQueueExchangeName(AMQShortString defaultQueueExchangeName) + { + _defaultQueueExchangeName = defaultQueueExchangeName; + } + + public AMQShortString getTemporaryTopicExchangeName() + { + return _temporaryTopicExchangeName; + } + + public AMQShortString getTemporaryQueueExchangeName() + { + return _temporaryQueueExchangeName; //To change body of created methods use File | Settings | File Templates. + } + + + public void setTemporaryTopicExchangeName(AMQShortString temporaryTopicExchangeName) + { + _temporaryTopicExchangeName = temporaryTopicExchangeName; + } + + public void setTemporaryQueueExchangeName(AMQShortString temporaryQueueExchangeName) + { + _temporaryQueueExchangeName = temporaryQueueExchangeName; } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java index fea83d3128..0dcc544ea8 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java @@ -20,6 +20,12 @@ */ package org.apache.qpid.client; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.jms.BrokerDetails; +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.url.URLHelper; +import org.apache.qpid.url.URLSyntaxException; + import java.net.URI; import java.net.URISyntaxException; import java.util.HashMap; @@ -27,11 +33,6 @@ import java.util.LinkedList; import java.util.List; import java.util.StringTokenizer; -import org.apache.qpid.jms.BrokerDetails; -import org.apache.qpid.jms.ConnectionURL; -import org.apache.qpid.url.URLHelper; -import org.apache.qpid.url.URLSyntaxException; - public class AMQConnectionURL implements ConnectionURL { private String _url; @@ -43,6 +44,11 @@ public class AMQConnectionURL implements ConnectionURL private String _username; private String _password; private String _virtualHost; + private AMQShortString _defaultQueueExchangeName; + private AMQShortString _defaultTopicExchangeName; + private AMQShortString _temporaryTopicExchangeName; + private AMQShortString _temporaryQueueExchangeName; + public AMQConnectionURL(String fullURL) throws URLSyntaxException { @@ -107,7 +113,7 @@ public class AMQConnectionURL implements ConnectionURL if (userInfo == null) { throw URLHelper.parseError(AMQ_PROTOCOL.length() + 3, - "User information not found on url", fullURL); + "User information not found on url", fullURL); } else { @@ -161,7 +167,9 @@ public class AMQConnectionURL implements ConnectionURL { if (slash != 0 && fullURL.charAt(slash - 1) == ':') { - throw URLHelper.parseError(slash - 2, fullURL.indexOf('?') - slash + 2, "Virtual host looks like a windows path, forward slash not allowed in URL", fullURL); + throw URLHelper.parseError(slash - 2, fullURL.indexOf('?') - slash + 2, + "Virtual host looks like a windows path, forward slash not allowed in URL", + fullURL); } else { @@ -181,7 +189,7 @@ public class AMQConnectionURL implements ConnectionURL if (colonIndex == -1) { throw URLHelper.parseError(AMQ_PROTOCOL.length() + 3, userinfo.length(), - "Null password in user information not allowed.", _url); + "Null password in user information not allowed.", _url); } else { @@ -230,6 +238,29 @@ public class AMQConnectionURL implements ConnectionURL _options.remove(OPTIONS_FAILOVER); } + + if (_options.containsKey(OPTIONS_DEFAULT_TOPIC_EXCHANGE)) + { + _defaultTopicExchangeName = new AMQShortString(_options.get(OPTIONS_DEFAULT_TOPIC_EXCHANGE)); + } + + + if (_options.containsKey(OPTIONS_DEFAULT_QUEUE_EXCHANGE)) + { + _defaultQueueExchangeName = new AMQShortString(_options.get(OPTIONS_DEFAULT_QUEUE_EXCHANGE)); + } + + + if (_options.containsKey(OPTIONS_TEMPORARY_QUEUE_EXCHANGE)) + { + _temporaryQueueExchangeName = new AMQShortString(_options.get(OPTIONS_TEMPORARY_QUEUE_EXCHANGE)); + } + + + if (_options.containsKey(OPTIONS_TEMPORARY_TOPIC_EXCHANGE)) + { + _temporaryTopicExchangeName = new AMQShortString(_options.get(OPTIONS_TEMPORARY_TOPIC_EXCHANGE)); + } } public String getURL() @@ -332,6 +363,26 @@ public class AMQConnectionURL implements ConnectionURL _options.put(key, value); } + public AMQShortString getDefaultQueueExchangeName() + { + return _defaultQueueExchangeName; + } + + public AMQShortString getDefaultTopicExchangeName() + { + return _defaultTopicExchangeName; + } + + public AMQShortString getTemporaryQueueExchangeName() + { + return _temporaryQueueExchangeName; + } + + public AMQShortString getTemporaryTopicExchangeName() + { + return _temporaryTopicExchangeName; + } + public String toString() { StringBuffer sb = new StringBuffer(); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java index a994dbc670..661372845a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java @@ -145,12 +145,12 @@ public abstract class AMQDestination implements Destination, Referenceable public boolean isTopic() { - return ExchangeDefaults.TOPIC_EXCHANGE_NAME.equals(_exchangeName); + return ExchangeDefaults.TOPIC_EXCHANGE_CLASS.equals(_exchangeClass); } public boolean isQueue() { - return ExchangeDefaults.DIRECT_EXCHANGE_NAME.equals(_exchangeName); + return ExchangeDefaults.DIRECT_EXCHANGE_CLASS.equals(_exchangeClass); } public AMQShortString getDestinationName() @@ -411,11 +411,11 @@ public abstract class AMQDestination implements Destination, Referenceable if (exchangeClass.equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS)) { - return new AMQQueue(destinationName,queueName,isExclusive,isAutoDelete,isDurable); + return new AMQQueue(exchangeName,destinationName,queueName,isExclusive,isAutoDelete,isDurable); } else if (exchangeClass.equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS)) { - return new AMQTopic(destinationName,isAutoDelete,queueName,isDurable); + return new AMQTopic(exchangeName,destinationName,isAutoDelete,queueName,isDurable); } else if (exchangeClass.equals(ExchangeDefaults.HEADERS_EXCHANGE_CLASS)) { diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java b/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java index 44328e3555..9185bc87e8 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java @@ -21,6 +21,7 @@ package org.apache.qpid.client; import javax.jms.Queue; +import javax.jms.Connection; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; @@ -43,9 +44,19 @@ public class AMQQueue extends AMQDestination implements Queue * Create a reference to a non temporary queue. Note this does not actually imply the queue exists. * @param name the name of the queue */ - public AMQQueue(AMQShortString name) + public AMQQueue(AMQShortString exchangeName, String name) { - this(name, false); + this(exchangeName, new AMQShortString(name)); + } + + + /** + * Create a reference to a non temporary queue. Note this does not actually imply the queue exists. + * @param name the name of the queue + */ + public AMQQueue(AMQShortString exchangeName, AMQShortString name) + { + this(exchangeName, name, false); } public AMQQueue(AMQShortString exchangeName, AMQShortString routingKey, AMQShortString queueName) @@ -58,9 +69,20 @@ public class AMQQueue extends AMQDestination implements Queue * Create a reference to a non temporary queue. Note this does not actually imply the queue exists. * @param name the name of the queue */ - public AMQQueue(String name) + public AMQQueue(String exchangeName, String name) { - this(new AMQShortString(name), false); + this(new AMQShortString(exchangeName), new AMQShortString(name), false); + } + + + public AMQQueue(AMQConnection connection, String name) + { + this(connection.getDefaultQueueExchangeName(),name); + } + + public AMQQueue(AMQConnection connection, String name, boolean temporary) + { + this(connection.getDefaultQueueExchangeName(), new AMQShortString(name),temporary); } @@ -71,9 +93,9 @@ public class AMQQueue extends AMQDestination implements Queue * @param temporary if true the broker will generate a queue name, also if true then the queue is autodeleted * and exclusive */ - public AMQQueue(String name, boolean temporary) + public AMQQueue(String exchangeName, String name, boolean temporary) { - this(new AMQShortString(name),temporary); + this(new AMQShortString(exchangeName), new AMQShortString(name),temporary); } @@ -84,11 +106,11 @@ public class AMQQueue extends AMQDestination implements Queue * @param temporary if true the broker will generate a queue name, also if true then the queue is autodeleted * and exclusive */ - public AMQQueue(AMQShortString name, boolean temporary) + public AMQQueue(AMQShortString exchangeName, AMQShortString name, boolean temporary) { // queue name is set to null indicating that the broker assigns a name in the case of temporary queues // temporary queues are typically used as response queues - this(name, temporary?null:name, temporary, temporary, !temporary); + this(exchangeName, name, temporary?null:name, temporary, temporary, !temporary); } @@ -99,19 +121,20 @@ public class AMQQueue extends AMQDestination implements Queue * @param exclusive true if the queue should only permit a single consumer * @param autoDelete true if the queue should be deleted automatically when the last consumers detaches */ - public AMQQueue(AMQShortString destinationName, AMQShortString queueName, boolean exclusive, boolean autoDelete) + public AMQQueue(AMQShortString exchangeName, AMQShortString destinationName, AMQShortString queueName, boolean exclusive, boolean autoDelete) { - this(destinationName, queueName, exclusive, autoDelete, false); + this(exchangeName, destinationName, queueName, exclusive, autoDelete, false); } - public AMQQueue(AMQShortString destinationName, AMQShortString queueName, boolean exclusive, boolean autoDelete, boolean durable) + public AMQQueue(AMQShortString exchangeName, AMQShortString destinationName, AMQShortString queueName, boolean exclusive, boolean autoDelete, boolean durable) { - super(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, destinationName, exclusive, + super(exchangeName, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, destinationName, exclusive, autoDelete, queueName, durable); } - + + public AMQShortString getRoutingKey() { return getAMQQueueName(); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 3973b5dd71..7ab26f3b47 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -1312,7 +1312,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi checkNotClosed(); if (queueName.indexOf('/') == -1) { - return new AMQQueue(queueName); + return new AMQQueue(getDefaultQueueExchangeName(), new AMQShortString(queueName)); } else { @@ -1330,6 +1330,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } + public AMQShortString getDefaultQueueExchangeName() + { + return _connection.getDefaultQueueExchangeName(); + } + /** * Creates a QueueReceiver wrapping a MessageConsumer * @@ -1379,7 +1384,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (topicName.indexOf('/') == -1) { - return new AMQTopic(new AMQShortString(topicName)); + return new AMQTopic(getDefaultTopicExchangeName(),new AMQShortString(topicName)); } else { @@ -1397,6 +1402,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } + public AMQShortString getDefaultTopicExchangeName() + { + return _connection.getDefaultTopicExchangeName(); + } + /** * Creates a non-durable subscriber * @@ -1409,8 +1419,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public TopicSubscriber createSubscriber(Topic topic) throws JMSException { checkNotClosed(); - checkValidTopic(topic); - AMQTopic dest = new AMQTopic(topic.getTopicName()); + AMQTopic dest = checkValidTopic(topic); + //AMQTopic dest = new AMQTopic(topic.getTopicName()); return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest)); } @@ -1428,16 +1438,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException { checkNotClosed(); - checkValidTopic(topic); - AMQTopic dest = new AMQTopic(topic.getTopicName()); + AMQTopic dest = checkValidTopic(topic); + //AMQTopic dest = new AMQTopic(topic.getTopicName()); return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal)); } public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException { checkNotClosed(); - checkValidTopic(topic); - AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection); + AMQTopic origTopic = checkValidTopic(topic); + AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection); TopicSubscriberAdaptor subscriber = _subscriptions.get(name); if (subscriber != null) { @@ -1464,8 +1474,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } // if the queue is bound to the exchange but NOT for this topic, then the JMS spec // says we must trash the subscription. - if (isQueueBound(dest.getAMQQueueName()) && - !isQueueBound(dest.getAMQQueueName(), topicName)) + if (isQueueBound(dest.getExchangeName(),dest.getAMQQueueName()) && + !isQueueBound(dest.getExchangeName(),dest.getAMQQueueName(), topicName)) { deleteQueue(dest.getAMQQueueName()); } @@ -1556,7 +1566,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } else { - if (isQueueBound(AMQTopic.getDurableTopicQueueName(name, _connection))) + if (isQueueBound(getDefaultTopicExchangeName(), AMQTopic.getDurableTopicQueueName(name, _connection))) { deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection)); } @@ -1567,17 +1577,17 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } - boolean isQueueBound(AMQShortString queueName) throws JMSException + boolean isQueueBound(AMQShortString exchangeName, AMQShortString queueName) throws JMSException { - return isQueueBound(queueName, null); + return isQueueBound(exchangeName, queueName, null); } - boolean isQueueBound(AMQShortString queueName, AMQShortString routingKey) throws JMSException + boolean isQueueBound(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey) throws JMSException { // TODO: Be aware of possible changes to parameter order as versions change. AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - ExchangeDefaults.TOPIC_EXCHANGE_NAME, // exchange + exchangeName, // exchange queueName, // queue routingKey); // routingKey AMQMethodEvent response = null; @@ -1858,7 +1868,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi /* * I could have combined the last 3 methods, but this way it improves readability */ - private void checkValidTopic(Topic topic) throws JMSException + private AMQTopic checkValidTopic(Topic topic) throws JMSException { if (topic == null) { @@ -1866,8 +1876,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } if ((topic instanceof TemporaryDestination) && ((TemporaryDestination) topic).getSession() != this) { - throw new JMSException("Cannot create a subscription on a temporary topic created in another session"); + throw new javax.jms.InvalidDestinationException("Cannot create a subscription on a temporary topic created in another session"); } + if(!(topic instanceof AMQTopic)) + { + throw new javax.jms.InvalidDestinationException("Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: " + topic.getClass().getName()); + } + return (AMQTopic) topic; } private void checkValidQueue(Queue queue) throws InvalidDestinationException @@ -1887,6 +1902,18 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } + public AMQShortString getTemporaryTopicExchangeName() + { + return _connection.getTemporaryTopicExchangeName(); + } + + public AMQShortString getTemporaryQueueExchangeName() + { + return _connection.getTemporaryQueueExchangeName(); + } + + + public int getTicket() { return _ticket; diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java b/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java index c350eb0c45..ce8e14506f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java @@ -40,7 +40,7 @@ final class AMQTemporaryQueue extends AMQQueue implements TemporaryQueue, Tempor */ public AMQTemporaryQueue(AMQSession session) { - super(new AMQShortString("TempQueue" + Long.toString(System.currentTimeMillis())), true); + super(session.getTemporaryQueueExchangeName(),new AMQShortString("TempQueue" + Long.toString(System.currentTimeMillis())), true); _session = session; } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java b/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java index 241a9abc9b..6c954ec3df 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.client; +import org.apache.qpid.framing.AMQShortString; + import javax.jms.JMSException; import javax.jms.TemporaryTopic; @@ -36,7 +38,7 @@ class AMQTemporaryTopic extends AMQTopic implements TemporaryTopic, TemporaryDes */ public AMQTemporaryTopic(AMQSession session) { - super("TempQueue" + Long.toString(System.currentTimeMillis())); + super(session.getTemporaryTopicExchangeName(),new AMQShortString("TempQueue" + Long.toString(System.currentTimeMillis()))); _session = session; } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java index 7d84ec6470..319e728edf 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java @@ -39,32 +39,43 @@ public class AMQTopic extends AMQDestination implements Topic super(binding); } - public AMQTopic(String name) - { - this(new AMQShortString(name)); - } +// public AMQTopic(String exchangeName, String routingKey) +// { +// this(new AMQShortString(exchangeName), new AMQShortString(routingKey)); +// } public AMQTopic(AMQShortString exchange, AMQShortString routingKey, AMQShortString queueName) { super(exchange, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, routingKey, true, true, queueName, false); } + public AMQTopic(AMQConnection conn, String routingKey) + { + this(conn.getDefaultTopicExchangeName(), new AMQShortString(routingKey)); + } + + + public AMQTopic(AMQShortString exchangeName, String routingKey) + { + this(exchangeName, new AMQShortString(routingKey)); + } - public AMQTopic(AMQShortString name) + public AMQTopic(AMQShortString exchangeName, AMQShortString routingKey) { - this(name, true, null, false); + this(exchangeName, routingKey, null); } - public AMQTopic(AMQShortString name, boolean isAutoDelete, AMQShortString queueName, boolean isDurable) + public AMQTopic(AMQShortString exchangeName, AMQShortString name, boolean isAutoDelete, AMQShortString queueName, boolean isDurable) { - super(ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, name, true, isAutoDelete, + super(exchangeName, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, name, true, isAutoDelete, queueName, isDurable); } public static AMQTopic createDurableTopic(AMQTopic topic, String subscriptionName, AMQConnection connection) throws JMSException { - return new AMQTopic(topic.getDestinationName(), false, getDurableTopicQueueName(subscriptionName, connection), + return new AMQTopic(topic.getExchangeName(), topic.getDestinationName(), false, + getDurableTopicQueueName(subscriptionName, connection), true); } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java index 3f8c1f65f8..19382b58c3 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java @@ -380,4 +380,9 @@ public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessag } } + public String toString() + { + return String.valueOf(System.identityHashCode(this)); + } + } diff --git a/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java b/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java index caadb0f621..2d91e290c4 100644 --- a/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java +++ b/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.jms; +import org.apache.qpid.framing.AMQShortString; + import java.util.List; /** @@ -35,6 +37,10 @@ public interface ConnectionURL public static final String OPTIONS_FAILOVER = "failover"; public static final String OPTIONS_FAILOVER_CYCLE = "cyclecount"; public static final String OPTIONS_SSL = "ssl"; + public static final String OPTIONS_DEFAULT_TOPIC_EXCHANGE = "defaultTopicExchange"; + public static final String OPTIONS_DEFAULT_QUEUE_EXCHANGE = "defaultQueueExchange"; + public static final String OPTIONS_TEMPORARY_TOPIC_EXCHANGE = "temporaryTopicExchange"; + public static final String OPTIONS_TEMPORARY_QUEUE_EXCHANGE = "temporaryQueueExchange"; String getURL(); @@ -69,4 +75,12 @@ public interface ConnectionURL String getOption(String key); void setOption(String key, String value); + + AMQShortString getDefaultQueueExchangeName(); + + AMQShortString getDefaultTopicExchangeName(); + + AMQShortString getTemporaryQueueExchangeName(); + + AMQShortString getTemporaryTopicExchangeName(); } diff --git a/java/client/src/main/java/org/apache/qpid/jms/Session.java b/java/client/src/main/java/org/apache/qpid/jms/Session.java index 025aef66c8..5287381fae 100644 --- a/java/client/src/main/java/org/apache/qpid/jms/Session.java +++ b/java/client/src/main/java/org/apache/qpid/jms/Session.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.jms; +import org.apache.qpid.framing.AMQShortString; + import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; @@ -88,4 +90,12 @@ public interface Session extends javax.jms.Session */ MessageProducer createProducer(Destination destination, boolean immediate) throws JMSException; + + AMQShortString getTemporaryTopicExchangeName(); + + AMQShortString getDefaultQueueExchangeName(); + + AMQShortString getDefaultTopicExchangeName(); + + AMQShortString getTemporaryQueueExchangeName(); } diff --git a/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java b/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java index 7a76aa0002..4f6f1561b6 100644 --- a/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java +++ b/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java @@ -47,6 +47,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.BindingURL; import org.apache.qpid.url.URLSyntaxException; +import org.apache.qpid.exchange.ExchangeDefaults; public class PropertiesFileInitialContextFactory implements InitialContextFactory { @@ -236,12 +237,12 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor { if(value instanceof AMQShortString) { - return new AMQQueue((AMQShortString) value); + return new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, (AMQShortString) value); } else if (value instanceof String) { - return new AMQQueue(new AMQShortString((String) value)); + return new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, new AMQShortString((String) value)); } else if (value instanceof BindingURL) @@ -259,11 +260,11 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor { if(value instanceof AMQShortString) { - return new AMQTopic((AMQShortString)value); + return new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, (AMQShortString)value); } else if (value instanceof String) { - return new AMQTopic(new AMQShortString((String) value)); + return new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, new AMQShortString((String) value)); } else if (value instanceof BindingURL) |
