diff options
author | Robert Gemmell <robbie@apache.org> | 2010-09-05 18:50:31 +0000 |
---|---|---|
committer | Robert Gemmell <robbie@apache.org> | 2010-09-05 18:50:31 +0000 |
commit | ba3c83a6b74d8145ece6930272b880260e613a3c (patch) | |
tree | 84a8b4b02a51cd529d8973d8ce88dcfc4a7f060c /java/client/src | |
parent | d5fd6b83251a81d29296149d85d8e846e7731be2 (diff) | |
download | qpid-python-ba3c83a6b74d8145ece6930272b880260e613a3c.tar.gz |
QPID-2418: Unsubscribe existing open durable subscriptions when changing subscription. Remove duplication in implementations.
Applied patch from Andrew Kennedy <andrew.international@gmail.com>
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@992855 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
4 files changed, 241 insertions, 220 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 55b56444de..b96b32d990 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -20,8 +20,49 @@ */ package org.apache.qpid.client; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.io.Serializable; +import java.net.URISyntaxException; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import javax.jms.BytesMessage; +import javax.jms.Destination; +import javax.jms.IllegalStateException; +import javax.jms.InvalidDestinationException; +import javax.jms.InvalidSelectorException; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Queue; +import javax.jms.QueueBrowser; +import javax.jms.QueueReceiver; +import javax.jms.QueueSender; +import javax.jms.QueueSession; +import javax.jms.StreamMessage; +import javax.jms.TemporaryQueue; +import javax.jms.TemporaryTopic; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.jms.TopicPublisher; +import javax.jms.TopicSession; +import javax.jms.TopicSubscriber; +import javax.jms.TransactionRolledBackException; import org.apache.qpid.AMQChannelClosedException; import org.apache.qpid.AMQDisconnectedException; @@ -45,7 +86,6 @@ import org.apache.qpid.client.message.JMSStreamMessage; import org.apache.qpid.client.message.JMSTextMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage; -import org.apache.qpid.client.message.UnprocessedMessage_0_10; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.AMQStateManager; @@ -56,25 +96,8 @@ import org.apache.qpid.framing.FieldTableFactory; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.jms.Session; import org.apache.qpid.thread.Threading; -import org.apache.qpid.url.AMQBindingURL; - -import javax.jms.*; -import javax.jms.IllegalStateException; -import java.io.Serializable; -import java.net.URISyntaxException; -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * <p/><table id="crc"><caption>CRC Card</caption> @@ -172,7 +195,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } } - final AMQSession _thisSession = this; + final AMQSession<C, P> _thisSession = this; /** Used for debugging. */ private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class); @@ -261,15 +284,22 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic * keeps a record of subscriptions which have been created in the current instance. It does not remember * subscriptions between executions of the client. */ - protected final ConcurrentHashMap<String, TopicSubscriberAdaptor> _subscriptions = - new ConcurrentHashMap<String, TopicSubscriberAdaptor>(); + protected final ConcurrentHashMap<String, TopicSubscriberAdaptor<C>> _subscriptions = + new ConcurrentHashMap<String, TopicSubscriberAdaptor<C>>(); /** * Holds a mapping from message consumers to their identifying names, so that their subscriptions may be looked * up in the {@link #_subscriptions} map. */ - protected final ConcurrentHashMap<C, String> _reverseSubscriptionMap = - new ConcurrentHashMap<C, String>(); + protected final ConcurrentHashMap<C, String> _reverseSubscriptionMap = new ConcurrentHashMap<C, String>(); + + /** + * Locks to keep access to subscriber details atomic. + * <p> + * Added for QPID2418 + */ + protected final Lock _subscriberDetails = new ReentrantLock(true); + protected final Lock _subscriberAccess = new ReentrantLock(true); /** * Used to hold incoming messages. @@ -311,9 +341,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic */ protected final IdToConsumerMap<C> _consumers = new IdToConsumerMap<C>(); - //Map<AMQShortString, BasicMessageConsumer> _consumers = - //new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>(); - /** * Contains a list of consumers which have been removed but which might still have * messages to acknowledge, eg in client ack or transacted modes @@ -670,9 +697,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { if (_logger.isInfoEnabled()) { - StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); + // StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); _logger.info("Closing session: " + this); // + ":" - // + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1)); + // Arrays.asList(stackTrace).subList(3, stackTrace.length - 1)); } // Ensure we only try and close an open session. @@ -991,24 +1018,100 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic false); } - public abstract TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException; - - public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) + public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException + { + // Delegate the work to the {@link #createDurableSubscriber(Topic, String, String, boolean)} method + return createDurableSubscriber(topic, name, null, false); + } + + public TopicSubscriber createDurableSubscriber(Topic topic, String name, String selector, boolean noLocal) throws JMSException { checkNotClosed(); - checkValidTopic(topic, true); - if (_subscriptions.containsKey(name)) + AMQTopic origTopic = checkValidTopic(topic, true); + AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection); + + String messageSelector = ((selector == null) || (selector.trim().length() == 0)) ? null : selector; + + _subscriberDetails.lock(); + try { - _subscriptions.get(name).close(); - } - AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection); - C consumer = (C) createConsumer(dest, messageSelector, noLocal); - TopicSubscriberAdaptor<C> subscriber = new TopicSubscriberAdaptor(dest, consumer); - _subscriptions.put(name, subscriber); - _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name); + TopicSubscriberAdaptor<C> subscriber = _subscriptions.get(name); + + // Not subscribed to this name in the current session + if (subscriber == null) + { + AMQShortString topicName; + if (topic instanceof AMQTopic) + { + topicName = ((AMQTopic) topic).getRoutingKey(); + } else + { + topicName = new AMQShortString(topic.getTopicName()); + } + + if (_strictAMQP) + { + if (_strictAMQPFATAL) + { + throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP."); + } + else + { + _logger.warn("Unable to determine if subscription already exists for '" + topicName + + "' for creation durableSubscriber. Requesting queue deletion regardless."); + } + + deleteQueue(dest.getAMQQueueName()); + } + else + { + // if the queue is bound to the exchange but NOT for this topic, then the JMS spec + // says we must trash the subscription. + if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()) + && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName)) + { + deleteQueue(dest.getAMQQueueName()); + } + } + } + else + { + // Subscribed with the same topic and no current / previous or same selector + if (subscriber.getTopic().equals(topic) + && ((messageSelector == null && subscriber.getMessageSelector() == null) + || (messageSelector != null && messageSelector.equals(subscriber.getMessageSelector())))) + { + throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription name " + name + + (messageSelector != null ? " and selector " + messageSelector : "")); + } + else + { + unsubscribe(name, true); + } + } + + _subscriberAccess.lock(); + try + { + C consumer = (C) createConsumer(dest, messageSelector, noLocal); + subscriber = new TopicSubscriberAdaptor<C>(dest, consumer); - return subscriber; + // Save subscription information + _subscriptions.put(name, subscriber); + _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name); + } + finally + { + _subscriberAccess.unlock(); + } + + return subscriber; + } + finally + { + _subscriberDetails.unlock(); + } } public MapMessage createMapMessage() throws JMSException @@ -1732,23 +1835,51 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic // } } - - /*public void setTicket(int ticket) - { - _ticket = ticket; - }*/ - + + /** + * @see #unsubscribe(String, boolean) + */ public void unsubscribe(String name) throws JMSException { - checkNotClosed(); - TopicSubscriberAdaptor subscriber = _subscriptions.get(name); + unsubscribe(name, false); + } + + /** + * Unsubscribe from a subscription. + * + * @param name the name of the subscription to unsubscribe + * @param safe allows safe unsubscribe operation that will not throw an {@link InvalidDestinationException} if the + * queue is not bound, possibly due to the subscription being closed. + * @throws JMSException on + * @throws InvalidDestinationException + */ + private void unsubscribe(String name, boolean safe) throws JMSException + { + TopicSubscriberAdaptor<C> subscriber; + + _subscriberDetails.lock(); + try + { + checkNotClosed(); + subscriber = _subscriptions.get(name); + if (subscriber != null) + { + // Remove saved subscription information + _subscriptions.remove(name); + _reverseSubscriptionMap.remove(subscriber.getMessageConsumer()); + } + } + finally + { + _subscriberDetails.unlock(); + } + if (subscriber != null) { subscriber.close(); + // send a queue.delete for the subscription deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection)); - _subscriptions.remove(name); - _reverseSubscriptionMap.remove(subscriber); } else { @@ -1763,7 +1894,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic _logger.warn("Unable to determine if subscription already exists for '" + name + "' for unsubscribe." + " Requesting queue deletion regardless."); } - + deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection)); } else // Queue Browser @@ -1773,9 +1904,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection)); } - else + else if (!safe) { - throw new InvalidDestinationException("Unknown subscription exchange:" + name); + throw new InvalidDestinationException("Unknown subscription name: " + name); } } } @@ -1889,10 +2020,18 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { if (_consumers.remove(consumer.getConsumerTag()) != null) { - String subscriptionName = _reverseSubscriptionMap.remove(consumer); - if (subscriptionName != null) + _subscriberAccess.lock(); + try + { + String subscriptionName = _reverseSubscriptionMap.remove(consumer); + if (subscriptionName != null) + { + _subscriptions.remove(subscriptionName); + } + } + finally { - _subscriptions.remove(subscriptionName); + _subscriberAccess.unlock(); } Destination dest = consumer.getDestination(); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 41ad9f543e..75db5d5673 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -34,10 +34,7 @@ import java.util.TimerTask; import java.util.UUID; import javax.jms.Destination; -import javax.jms.IllegalStateException; import javax.jms.JMSException; -import javax.jms.Topic; -import javax.jms.TopicSubscriber; import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQDestination.AddressOption; @@ -916,72 +913,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic }, _connection).execute(); } - public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException - { - - checkNotClosed(); - AMQTopic origTopic=checkValidTopic(topic, true); - AMQTopic dest=AMQTopic.createDurable010Topic(origTopic, name, _connection); - - TopicSubscriberAdaptor<BasicMessageConsumer_0_10> subscriber=_subscriptions.get(name); - if (subscriber != null) - { - if (subscriber.getTopic().equals(topic)) - { - throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription exchange " - + name); - } - else - { - unsubscribe(name); - } - } - else - { - AMQShortString topicName; - if (topic instanceof AMQTopic) - { - topicName=((AMQTopic) topic).getBindingKeys()[0]; - } - else - { - topicName=new AMQShortString(topic.getTopicName()); - } - - if (_strictAMQP) - { - if (_strictAMQPFATAL) - { - throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP."); - } - else - { - _logger.warn("Unable to determine if subscription already exists for '" + topicName + "' " - + "for creation durableSubscriber. Requesting queue deletion regardless."); - } - - deleteQueue(dest.getAMQQueueName()); - } - else - { - // if the queue is bound to the exchange but NOT for this topic, then the JMS spec - // says we must trash the subscription. - if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()) - && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName)) - { - deleteQueue(dest.getAMQQueueName()); - } - } - } - - subscriber=new TopicSubscriberAdaptor(dest, createExclusiveConsumer(dest)); - - _subscriptions.put(name, subscriber); - _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name); - - return subscriber; - } - protected Long requestQueueDepth(AMQDestination amqd) { return getQpidSession().queueQuery(amqd.getQueueName()).get().getMessageCount(); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 939e29e4d5..89fbd66e71 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -21,31 +21,63 @@ package org.apache.qpid.client; -import javax.jms.*; -import javax.jms.IllegalStateException; +import java.util.Map; + +import javax.jms.Destination; +import javax.jms.JMSException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQUndeliveredException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.failover.FailoverRetrySupport; -import org.apache.qpid.client.message.*; import org.apache.qpid.client.message.AMQMessageDelegateFactory; +import org.apache.qpid.client.message.AbstractJMSMessage; +import org.apache.qpid.client.message.MessageFactoryRegistry; +import org.apache.qpid.client.message.ReturnMessage; +import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; import org.apache.qpid.client.state.AMQState; +import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; import org.apache.qpid.common.AMQPFilterTypes; -import org.apache.qpid.framing.*; -import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicAckBody; +import org.apache.qpid.framing.BasicConsumeBody; +import org.apache.qpid.framing.BasicConsumeOkBody; +import org.apache.qpid.framing.BasicQosBody; +import org.apache.qpid.framing.BasicQosOkBody; +import org.apache.qpid.framing.BasicRecoverBody; +import org.apache.qpid.framing.BasicRecoverOkBody; +import org.apache.qpid.framing.BasicRecoverSyncBody; +import org.apache.qpid.framing.BasicRecoverSyncOkBody; +import org.apache.qpid.framing.BasicRejectBody; +import org.apache.qpid.framing.ChannelCloseOkBody; +import org.apache.qpid.framing.ChannelFlowBody; +import org.apache.qpid.framing.ChannelFlowOkBody; +import org.apache.qpid.framing.ExchangeBoundOkBody; +import org.apache.qpid.framing.ExchangeDeclareBody; +import org.apache.qpid.framing.ExchangeDeclareOkBody; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.FieldTableFactory; +import org.apache.qpid.framing.ProtocolVersion; +import org.apache.qpid.framing.QueueBindOkBody; +import org.apache.qpid.framing.QueueDeclareBody; +import org.apache.qpid.framing.QueueDeclareOkBody; +import org.apache.qpid.framing.QueueDeleteBody; +import org.apache.qpid.framing.QueueDeleteOkBody; +import org.apache.qpid.framing.TxCommitOkBody; +import org.apache.qpid.framing.TxRollbackBody; +import org.apache.qpid.framing.TxRollbackOkBody; import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9; +import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91; import org.apache.qpid.jms.Session; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Map; - public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8> { @@ -442,74 +474,6 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B getProtocolHandler().syncWrite(frame, TxRollbackOkBody.class); } - public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException - { - - checkNotClosed(); - AMQTopic origTopic = checkValidTopic(topic, true); - AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection); - TopicSubscriberAdaptor<BasicMessageConsumer_0_8> subscriber = _subscriptions.get(name); - if (subscriber != null) - { - if (subscriber.getTopic().equals(topic)) - { - throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription exchange " - + name); - } - else - { - unsubscribe(name); - } - } - else - { - AMQShortString topicName; - if (topic instanceof AMQTopic) - { - topicName = ((AMQTopic) topic).getRoutingKey(); - } - else - { - topicName = new AMQShortString(topic.getTopicName()); - } - - if (_strictAMQP) - { - if (_strictAMQPFATAL) - { - throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP."); - } - else - { - _logger.warn("Unable to determine if subscription already exists for '" + topicName + "' " - + "for creation durableSubscriber. Requesting queue deletion regardless."); - } - - deleteQueue(dest.getAMQQueueName()); - } - else - { - // if the queue is bound to the exchange but NOT for this topic, then the JMS spec - // says we must trash the subscription. - if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()) - && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName)) - { - deleteQueue(dest.getAMQQueueName()); - } - } - } - - subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest)); - - _subscriptions.put(name, subscriber); - _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name); - - return subscriber; - } - - - - public void setPrefetchLimits(final int messagePrefetch, final long sizePrefetch) throws AMQException { new FailoverRetrySupport<Object, AMQException>( diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java index 5f3f4e7d19..2704393cc2 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java @@ -46,11 +46,6 @@ public class AMQTopic extends AMQDestination implements Topic super(binding); } -// public AMQTopic(String exchangeName, String routingKey) -// { -// this(new AMQShortString(exchangeName), new AMQShortString(routingKey)); -// } - public AMQTopic(AMQShortString exchange, AMQShortString routingKey, AMQShortString queueName) { super(exchange, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, routingKey, true, true, queueName, false); @@ -79,8 +74,7 @@ public class AMQTopic extends AMQDestination implements Topic public AMQTopic(AMQShortString exchangeName, AMQShortString name, boolean isAutoDelete, AMQShortString queueName, boolean isDurable) { - super(exchangeName, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, name, true, isAutoDelete, - queueName, isDurable); + super(exchangeName, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, name, true, isAutoDelete, queueName, isDurable); } protected AMQTopic(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive, @@ -103,13 +97,6 @@ public class AMQTopic extends AMQDestination implements Topic true); } - public static AMQTopic createDurable010Topic(AMQTopic topic, String subscriptionName, AMQConnection connection) - throws JMSException - { - return new AMQTopic(topic.getExchangeName(), ExchangeDefaults.TOPIC_EXCHANGE_CLASS, topic.getRoutingKey(), true, false, - getDurableTopicQueueName(subscriptionName, connection), true); - } - public static AMQShortString getDurableTopicQueueName(String subscriptionName, AMQConnection connection) throws JMSException { return new AMQShortString(connection.getClientID() + ":" + subscriptionName); |