summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2010-09-05 18:50:31 +0000
committerRobert Gemmell <robbie@apache.org>2010-09-05 18:50:31 +0000
commitba3c83a6b74d8145ece6930272b880260e613a3c (patch)
tree84a8b4b02a51cd529d8973d8ce88dcfc4a7f060c /java/client/src
parentd5fd6b83251a81d29296149d85d8e846e7731be2 (diff)
downloadqpid-python-ba3c83a6b74d8145ece6930272b880260e613a3c.tar.gz
QPID-2418: Unsubscribe existing open durable subscriptions when changing subscription. Remove duplication in implementations.
Applied patch from Andrew Kennedy <andrew.international@gmail.com> git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@992855 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java261
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java69
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java116
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQTopic.java15
4 files changed, 241 insertions, 220 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 55b56444de..b96b32d990 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -20,8 +20,49 @@
*/
package org.apache.qpid.client;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.io.Serializable;
+import java.net.URISyntaxException;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.InvalidDestinationException;
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSender;
+import javax.jms.QueueSession;
+import javax.jms.StreamMessage;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+import javax.jms.TransactionRolledBackException;
import org.apache.qpid.AMQChannelClosedException;
import org.apache.qpid.AMQDisconnectedException;
@@ -45,7 +86,6 @@ import org.apache.qpid.client.message.JMSStreamMessage;
import org.apache.qpid.client.message.JMSTextMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
-import org.apache.qpid.client.message.UnprocessedMessage_0_10;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
@@ -56,25 +96,8 @@ import org.apache.qpid.framing.FieldTableFactory;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.jms.Session;
import org.apache.qpid.thread.Threading;
-import org.apache.qpid.url.AMQBindingURL;
-
-import javax.jms.*;
-import javax.jms.IllegalStateException;
-import java.io.Serializable;
-import java.net.URISyntaxException;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* <p/><table id="crc"><caption>CRC Card</caption>
@@ -172,7 +195,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
}
- final AMQSession _thisSession = this;
+ final AMQSession<C, P> _thisSession = this;
/** Used for debugging. */
private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
@@ -261,15 +284,22 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
* keeps a record of subscriptions which have been created in the current instance. It does not remember
* subscriptions between executions of the client.
*/
- protected final ConcurrentHashMap<String, TopicSubscriberAdaptor> _subscriptions =
- new ConcurrentHashMap<String, TopicSubscriberAdaptor>();
+ protected final ConcurrentHashMap<String, TopicSubscriberAdaptor<C>> _subscriptions =
+ new ConcurrentHashMap<String, TopicSubscriberAdaptor<C>>();
/**
* Holds a mapping from message consumers to their identifying names, so that their subscriptions may be looked
* up in the {@link #_subscriptions} map.
*/
- protected final ConcurrentHashMap<C, String> _reverseSubscriptionMap =
- new ConcurrentHashMap<C, String>();
+ protected final ConcurrentHashMap<C, String> _reverseSubscriptionMap = new ConcurrentHashMap<C, String>();
+
+ /**
+ * Locks to keep access to subscriber details atomic.
+ * <p>
+ * Added for QPID2418
+ */
+ protected final Lock _subscriberDetails = new ReentrantLock(true);
+ protected final Lock _subscriberAccess = new ReentrantLock(true);
/**
* Used to hold incoming messages.
@@ -311,9 +341,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
*/
protected final IdToConsumerMap<C> _consumers = new IdToConsumerMap<C>();
- //Map<AMQShortString, BasicMessageConsumer> _consumers =
- //new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>();
-
/**
* Contains a list of consumers which have been removed but which might still have
* messages to acknowledge, eg in client ack or transacted modes
@@ -670,9 +697,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
if (_logger.isInfoEnabled())
{
- StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
+ // StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
_logger.info("Closing session: " + this); // + ":"
- // + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
+ // Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
}
// Ensure we only try and close an open session.
@@ -991,24 +1018,100 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
false);
}
- public abstract TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException;
-
- public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
+ public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
+ {
+ // Delegate the work to the {@link #createDurableSubscriber(Topic, String, String, boolean)} method
+ return createDurableSubscriber(topic, name, null, false);
+ }
+
+ public TopicSubscriber createDurableSubscriber(Topic topic, String name, String selector, boolean noLocal)
throws JMSException
{
checkNotClosed();
- checkValidTopic(topic, true);
- if (_subscriptions.containsKey(name))
+ AMQTopic origTopic = checkValidTopic(topic, true);
+ AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection);
+
+ String messageSelector = ((selector == null) || (selector.trim().length() == 0)) ? null : selector;
+
+ _subscriberDetails.lock();
+ try
{
- _subscriptions.get(name).close();
- }
- AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection);
- C consumer = (C) createConsumer(dest, messageSelector, noLocal);
- TopicSubscriberAdaptor<C> subscriber = new TopicSubscriberAdaptor(dest, consumer);
- _subscriptions.put(name, subscriber);
- _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
+ TopicSubscriberAdaptor<C> subscriber = _subscriptions.get(name);
+
+ // Not subscribed to this name in the current session
+ if (subscriber == null)
+ {
+ AMQShortString topicName;
+ if (topic instanceof AMQTopic)
+ {
+ topicName = ((AMQTopic) topic).getRoutingKey();
+ } else
+ {
+ topicName = new AMQShortString(topic.getTopicName());
+ }
+
+ if (_strictAMQP)
+ {
+ if (_strictAMQPFATAL)
+ {
+ throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP.");
+ }
+ else
+ {
+ _logger.warn("Unable to determine if subscription already exists for '" + topicName
+ + "' for creation durableSubscriber. Requesting queue deletion regardless.");
+ }
+
+ deleteQueue(dest.getAMQQueueName());
+ }
+ else
+ {
+ // if the queue is bound to the exchange but NOT for this topic, then the JMS spec
+ // says we must trash the subscription.
+ if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName())
+ && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName))
+ {
+ deleteQueue(dest.getAMQQueueName());
+ }
+ }
+ }
+ else
+ {
+ // Subscribed with the same topic and no current / previous or same selector
+ if (subscriber.getTopic().equals(topic)
+ && ((messageSelector == null && subscriber.getMessageSelector() == null)
+ || (messageSelector != null && messageSelector.equals(subscriber.getMessageSelector()))))
+ {
+ throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription name " + name
+ + (messageSelector != null ? " and selector " + messageSelector : ""));
+ }
+ else
+ {
+ unsubscribe(name, true);
+ }
+ }
+
+ _subscriberAccess.lock();
+ try
+ {
+ C consumer = (C) createConsumer(dest, messageSelector, noLocal);
+ subscriber = new TopicSubscriberAdaptor<C>(dest, consumer);
- return subscriber;
+ // Save subscription information
+ _subscriptions.put(name, subscriber);
+ _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
+ }
+ finally
+ {
+ _subscriberAccess.unlock();
+ }
+
+ return subscriber;
+ }
+ finally
+ {
+ _subscriberDetails.unlock();
+ }
}
public MapMessage createMapMessage() throws JMSException
@@ -1732,23 +1835,51 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
// }
}
-
- /*public void setTicket(int ticket)
- {
- _ticket = ticket;
- }*/
-
+
+ /**
+ * @see #unsubscribe(String, boolean)
+ */
public void unsubscribe(String name) throws JMSException
{
- checkNotClosed();
- TopicSubscriberAdaptor subscriber = _subscriptions.get(name);
+ unsubscribe(name, false);
+ }
+
+ /**
+ * Unsubscribe from a subscription.
+ *
+ * @param name the name of the subscription to unsubscribe
+ * @param safe allows safe unsubscribe operation that will not throw an {@link InvalidDestinationException} if the
+ * queue is not bound, possibly due to the subscription being closed.
+ * @throws JMSException on
+ * @throws InvalidDestinationException
+ */
+ private void unsubscribe(String name, boolean safe) throws JMSException
+ {
+ TopicSubscriberAdaptor<C> subscriber;
+
+ _subscriberDetails.lock();
+ try
+ {
+ checkNotClosed();
+ subscriber = _subscriptions.get(name);
+ if (subscriber != null)
+ {
+ // Remove saved subscription information
+ _subscriptions.remove(name);
+ _reverseSubscriptionMap.remove(subscriber.getMessageConsumer());
+ }
+ }
+ finally
+ {
+ _subscriberDetails.unlock();
+ }
+
if (subscriber != null)
{
subscriber.close();
+
// send a queue.delete for the subscription
deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection));
- _subscriptions.remove(name);
- _reverseSubscriptionMap.remove(subscriber);
}
else
{
@@ -1763,7 +1894,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
_logger.warn("Unable to determine if subscription already exists for '" + name + "' for unsubscribe."
+ " Requesting queue deletion regardless.");
}
-
+
deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection));
}
else // Queue Browser
@@ -1773,9 +1904,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection));
}
- else
+ else if (!safe)
{
- throw new InvalidDestinationException("Unknown subscription exchange:" + name);
+ throw new InvalidDestinationException("Unknown subscription name: " + name);
}
}
}
@@ -1889,10 +2020,18 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
if (_consumers.remove(consumer.getConsumerTag()) != null)
{
- String subscriptionName = _reverseSubscriptionMap.remove(consumer);
- if (subscriptionName != null)
+ _subscriberAccess.lock();
+ try
+ {
+ String subscriptionName = _reverseSubscriptionMap.remove(consumer);
+ if (subscriptionName != null)
+ {
+ _subscriptions.remove(subscriptionName);
+ }
+ }
+ finally
{
- _subscriptions.remove(subscriptionName);
+ _subscriberAccess.unlock();
}
Destination dest = consumer.getDestination();
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 41ad9f543e..75db5d5673 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -34,10 +34,7 @@ import java.util.TimerTask;
import java.util.UUID;
import javax.jms.Destination;
-import javax.jms.IllegalStateException;
import javax.jms.JMSException;
-import javax.jms.Topic;
-import javax.jms.TopicSubscriber;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQDestination.AddressOption;
@@ -916,72 +913,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}, _connection).execute();
}
- public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
- {
-
- checkNotClosed();
- AMQTopic origTopic=checkValidTopic(topic, true);
- AMQTopic dest=AMQTopic.createDurable010Topic(origTopic, name, _connection);
-
- TopicSubscriberAdaptor<BasicMessageConsumer_0_10> subscriber=_subscriptions.get(name);
- if (subscriber != null)
- {
- if (subscriber.getTopic().equals(topic))
- {
- throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription exchange "
- + name);
- }
- else
- {
- unsubscribe(name);
- }
- }
- else
- {
- AMQShortString topicName;
- if (topic instanceof AMQTopic)
- {
- topicName=((AMQTopic) topic).getBindingKeys()[0];
- }
- else
- {
- topicName=new AMQShortString(topic.getTopicName());
- }
-
- if (_strictAMQP)
- {
- if (_strictAMQPFATAL)
- {
- throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP.");
- }
- else
- {
- _logger.warn("Unable to determine if subscription already exists for '" + topicName + "' "
- + "for creation durableSubscriber. Requesting queue deletion regardless.");
- }
-
- deleteQueue(dest.getAMQQueueName());
- }
- else
- {
- // if the queue is bound to the exchange but NOT for this topic, then the JMS spec
- // says we must trash the subscription.
- if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName())
- && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName))
- {
- deleteQueue(dest.getAMQQueueName());
- }
- }
- }
-
- subscriber=new TopicSubscriberAdaptor(dest, createExclusiveConsumer(dest));
-
- _subscriptions.put(name, subscriber);
- _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
-
- return subscriber;
- }
-
protected Long requestQueueDepth(AMQDestination amqd)
{
return getQpidSession().queueQuery(amqd.getQueueName()).get().getMessageCount();
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index 939e29e4d5..89fbd66e71 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -21,31 +21,63 @@
package org.apache.qpid.client;
-import javax.jms.*;
-import javax.jms.IllegalStateException;
+import java.util.Map;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUndeliveredException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.failover.FailoverRetrySupport;
-import org.apache.qpid.client.message.*;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
+import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.message.MessageFactoryRegistry;
+import org.apache.qpid.client.message.ReturnMessage;
+import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.client.state.AMQState;
+import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicAckBody;
+import org.apache.qpid.framing.BasicConsumeBody;
+import org.apache.qpid.framing.BasicConsumeOkBody;
+import org.apache.qpid.framing.BasicQosBody;
+import org.apache.qpid.framing.BasicQosOkBody;
+import org.apache.qpid.framing.BasicRecoverBody;
+import org.apache.qpid.framing.BasicRecoverOkBody;
+import org.apache.qpid.framing.BasicRecoverSyncBody;
+import org.apache.qpid.framing.BasicRecoverSyncOkBody;
+import org.apache.qpid.framing.BasicRejectBody;
+import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.framing.ChannelFlowBody;
+import org.apache.qpid.framing.ChannelFlowOkBody;
+import org.apache.qpid.framing.ExchangeBoundOkBody;
+import org.apache.qpid.framing.ExchangeDeclareBody;
+import org.apache.qpid.framing.ExchangeDeclareOkBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.FieldTableFactory;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.framing.QueueBindOkBody;
+import org.apache.qpid.framing.QueueDeclareBody;
+import org.apache.qpid.framing.QueueDeclareOkBody;
+import org.apache.qpid.framing.QueueDeleteBody;
+import org.apache.qpid.framing.QueueDeleteOkBody;
+import org.apache.qpid.framing.TxCommitOkBody;
+import org.apache.qpid.framing.TxRollbackBody;
+import org.apache.qpid.framing.TxRollbackOkBody;
import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
+import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91;
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Map;
-
public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8>
{
@@ -442,74 +474,6 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
getProtocolHandler().syncWrite(frame, TxRollbackOkBody.class);
}
- public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
- {
-
- checkNotClosed();
- AMQTopic origTopic = checkValidTopic(topic, true);
- AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection);
- TopicSubscriberAdaptor<BasicMessageConsumer_0_8> subscriber = _subscriptions.get(name);
- if (subscriber != null)
- {
- if (subscriber.getTopic().equals(topic))
- {
- throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription exchange "
- + name);
- }
- else
- {
- unsubscribe(name);
- }
- }
- else
- {
- AMQShortString topicName;
- if (topic instanceof AMQTopic)
- {
- topicName = ((AMQTopic) topic).getRoutingKey();
- }
- else
- {
- topicName = new AMQShortString(topic.getTopicName());
- }
-
- if (_strictAMQP)
- {
- if (_strictAMQPFATAL)
- {
- throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP.");
- }
- else
- {
- _logger.warn("Unable to determine if subscription already exists for '" + topicName + "' "
- + "for creation durableSubscriber. Requesting queue deletion regardless.");
- }
-
- deleteQueue(dest.getAMQQueueName());
- }
- else
- {
- // if the queue is bound to the exchange but NOT for this topic, then the JMS spec
- // says we must trash the subscription.
- if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName())
- && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName))
- {
- deleteQueue(dest.getAMQQueueName());
- }
- }
- }
-
- subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
-
- _subscriptions.put(name, subscriber);
- _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
-
- return subscriber;
- }
-
-
-
-
public void setPrefetchLimits(final int messagePrefetch, final long sizePrefetch) throws AMQException
{
new FailoverRetrySupport<Object, AMQException>(
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
index 5f3f4e7d19..2704393cc2 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
@@ -46,11 +46,6 @@ public class AMQTopic extends AMQDestination implements Topic
super(binding);
}
-// public AMQTopic(String exchangeName, String routingKey)
-// {
-// this(new AMQShortString(exchangeName), new AMQShortString(routingKey));
-// }
-
public AMQTopic(AMQShortString exchange, AMQShortString routingKey, AMQShortString queueName)
{
super(exchange, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, routingKey, true, true, queueName, false);
@@ -79,8 +74,7 @@ public class AMQTopic extends AMQDestination implements Topic
public AMQTopic(AMQShortString exchangeName, AMQShortString name, boolean isAutoDelete, AMQShortString queueName, boolean isDurable)
{
- super(exchangeName, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, name, true, isAutoDelete,
- queueName, isDurable);
+ super(exchangeName, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, name, true, isAutoDelete, queueName, isDurable);
}
protected AMQTopic(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive,
@@ -103,13 +97,6 @@ public class AMQTopic extends AMQDestination implements Topic
true);
}
- public static AMQTopic createDurable010Topic(AMQTopic topic, String subscriptionName, AMQConnection connection)
- throws JMSException
- {
- return new AMQTopic(topic.getExchangeName(), ExchangeDefaults.TOPIC_EXCHANGE_CLASS, topic.getRoutingKey(), true, false,
- getDurableTopicQueueName(subscriptionName, connection), true);
- }
-
public static AMQShortString getDurableTopicQueueName(String subscriptionName, AMQConnection connection) throws JMSException
{
return new AMQShortString(connection.getClientID() + ":" + subscriptionName);