summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java261
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java69
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java116
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQTopic.java15
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java10
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java71
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java272
7 files changed, 548 insertions, 266 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 55b56444de..b96b32d990 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -20,8 +20,49 @@
*/
package org.apache.qpid.client;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.io.Serializable;
+import java.net.URISyntaxException;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.InvalidDestinationException;
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSender;
+import javax.jms.QueueSession;
+import javax.jms.StreamMessage;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+import javax.jms.TransactionRolledBackException;
import org.apache.qpid.AMQChannelClosedException;
import org.apache.qpid.AMQDisconnectedException;
@@ -45,7 +86,6 @@ import org.apache.qpid.client.message.JMSStreamMessage;
import org.apache.qpid.client.message.JMSTextMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
-import org.apache.qpid.client.message.UnprocessedMessage_0_10;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
@@ -56,25 +96,8 @@ import org.apache.qpid.framing.FieldTableFactory;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.jms.Session;
import org.apache.qpid.thread.Threading;
-import org.apache.qpid.url.AMQBindingURL;
-
-import javax.jms.*;
-import javax.jms.IllegalStateException;
-import java.io.Serializable;
-import java.net.URISyntaxException;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* <p/><table id="crc"><caption>CRC Card</caption>
@@ -172,7 +195,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
}
- final AMQSession _thisSession = this;
+ final AMQSession<C, P> _thisSession = this;
/** Used for debugging. */
private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
@@ -261,15 +284,22 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
* keeps a record of subscriptions which have been created in the current instance. It does not remember
* subscriptions between executions of the client.
*/
- protected final ConcurrentHashMap<String, TopicSubscriberAdaptor> _subscriptions =
- new ConcurrentHashMap<String, TopicSubscriberAdaptor>();
+ protected final ConcurrentHashMap<String, TopicSubscriberAdaptor<C>> _subscriptions =
+ new ConcurrentHashMap<String, TopicSubscriberAdaptor<C>>();
/**
* Holds a mapping from message consumers to their identifying names, so that their subscriptions may be looked
* up in the {@link #_subscriptions} map.
*/
- protected final ConcurrentHashMap<C, String> _reverseSubscriptionMap =
- new ConcurrentHashMap<C, String>();
+ protected final ConcurrentHashMap<C, String> _reverseSubscriptionMap = new ConcurrentHashMap<C, String>();
+
+ /**
+ * Locks to keep access to subscriber details atomic.
+ * <p>
+ * Added for QPID2418
+ */
+ protected final Lock _subscriberDetails = new ReentrantLock(true);
+ protected final Lock _subscriberAccess = new ReentrantLock(true);
/**
* Used to hold incoming messages.
@@ -311,9 +341,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
*/
protected final IdToConsumerMap<C> _consumers = new IdToConsumerMap<C>();
- //Map<AMQShortString, BasicMessageConsumer> _consumers =
- //new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>();
-
/**
* Contains a list of consumers which have been removed but which might still have
* messages to acknowledge, eg in client ack or transacted modes
@@ -670,9 +697,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
if (_logger.isInfoEnabled())
{
- StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
+ // StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
_logger.info("Closing session: " + this); // + ":"
- // + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
+ // Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
}
// Ensure we only try and close an open session.
@@ -991,24 +1018,100 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
false);
}
- public abstract TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException;
-
- public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
+ public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
+ {
+ // Delegate the work to the {@link #createDurableSubscriber(Topic, String, String, boolean)} method
+ return createDurableSubscriber(topic, name, null, false);
+ }
+
+ public TopicSubscriber createDurableSubscriber(Topic topic, String name, String selector, boolean noLocal)
throws JMSException
{
checkNotClosed();
- checkValidTopic(topic, true);
- if (_subscriptions.containsKey(name))
+ AMQTopic origTopic = checkValidTopic(topic, true);
+ AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection);
+
+ String messageSelector = ((selector == null) || (selector.trim().length() == 0)) ? null : selector;
+
+ _subscriberDetails.lock();
+ try
{
- _subscriptions.get(name).close();
- }
- AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection);
- C consumer = (C) createConsumer(dest, messageSelector, noLocal);
- TopicSubscriberAdaptor<C> subscriber = new TopicSubscriberAdaptor(dest, consumer);
- _subscriptions.put(name, subscriber);
- _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
+ TopicSubscriberAdaptor<C> subscriber = _subscriptions.get(name);
+
+ // Not subscribed to this name in the current session
+ if (subscriber == null)
+ {
+ AMQShortString topicName;
+ if (topic instanceof AMQTopic)
+ {
+ topicName = ((AMQTopic) topic).getRoutingKey();
+ } else
+ {
+ topicName = new AMQShortString(topic.getTopicName());
+ }
+
+ if (_strictAMQP)
+ {
+ if (_strictAMQPFATAL)
+ {
+ throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP.");
+ }
+ else
+ {
+ _logger.warn("Unable to determine if subscription already exists for '" + topicName
+ + "' for creation durableSubscriber. Requesting queue deletion regardless.");
+ }
+
+ deleteQueue(dest.getAMQQueueName());
+ }
+ else
+ {
+ // if the queue is bound to the exchange but NOT for this topic, then the JMS spec
+ // says we must trash the subscription.
+ if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName())
+ && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName))
+ {
+ deleteQueue(dest.getAMQQueueName());
+ }
+ }
+ }
+ else
+ {
+ // Subscribed with the same topic and no current / previous or same selector
+ if (subscriber.getTopic().equals(topic)
+ && ((messageSelector == null && subscriber.getMessageSelector() == null)
+ || (messageSelector != null && messageSelector.equals(subscriber.getMessageSelector()))))
+ {
+ throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription name " + name
+ + (messageSelector != null ? " and selector " + messageSelector : ""));
+ }
+ else
+ {
+ unsubscribe(name, true);
+ }
+ }
+
+ _subscriberAccess.lock();
+ try
+ {
+ C consumer = (C) createConsumer(dest, messageSelector, noLocal);
+ subscriber = new TopicSubscriberAdaptor<C>(dest, consumer);
- return subscriber;
+ // Save subscription information
+ _subscriptions.put(name, subscriber);
+ _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
+ }
+ finally
+ {
+ _subscriberAccess.unlock();
+ }
+
+ return subscriber;
+ }
+ finally
+ {
+ _subscriberDetails.unlock();
+ }
}
public MapMessage createMapMessage() throws JMSException
@@ -1732,23 +1835,51 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
// }
}
-
- /*public void setTicket(int ticket)
- {
- _ticket = ticket;
- }*/
-
+
+ /**
+ * @see #unsubscribe(String, boolean)
+ */
public void unsubscribe(String name) throws JMSException
{
- checkNotClosed();
- TopicSubscriberAdaptor subscriber = _subscriptions.get(name);
+ unsubscribe(name, false);
+ }
+
+ /**
+ * Unsubscribe from a subscription.
+ *
+ * @param name the name of the subscription to unsubscribe
+ * @param safe allows safe unsubscribe operation that will not throw an {@link InvalidDestinationException} if the
+ * queue is not bound, possibly due to the subscription being closed.
+ * @throws JMSException on
+ * @throws InvalidDestinationException
+ */
+ private void unsubscribe(String name, boolean safe) throws JMSException
+ {
+ TopicSubscriberAdaptor<C> subscriber;
+
+ _subscriberDetails.lock();
+ try
+ {
+ checkNotClosed();
+ subscriber = _subscriptions.get(name);
+ if (subscriber != null)
+ {
+ // Remove saved subscription information
+ _subscriptions.remove(name);
+ _reverseSubscriptionMap.remove(subscriber.getMessageConsumer());
+ }
+ }
+ finally
+ {
+ _subscriberDetails.unlock();
+ }
+
if (subscriber != null)
{
subscriber.close();
+
// send a queue.delete for the subscription
deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection));
- _subscriptions.remove(name);
- _reverseSubscriptionMap.remove(subscriber);
}
else
{
@@ -1763,7 +1894,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
_logger.warn("Unable to determine if subscription already exists for '" + name + "' for unsubscribe."
+ " Requesting queue deletion regardless.");
}
-
+
deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection));
}
else // Queue Browser
@@ -1773,9 +1904,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection));
}
- else
+ else if (!safe)
{
- throw new InvalidDestinationException("Unknown subscription exchange:" + name);
+ throw new InvalidDestinationException("Unknown subscription name: " + name);
}
}
}
@@ -1889,10 +2020,18 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
if (_consumers.remove(consumer.getConsumerTag()) != null)
{
- String subscriptionName = _reverseSubscriptionMap.remove(consumer);
- if (subscriptionName != null)
+ _subscriberAccess.lock();
+ try
+ {
+ String subscriptionName = _reverseSubscriptionMap.remove(consumer);
+ if (subscriptionName != null)
+ {
+ _subscriptions.remove(subscriptionName);
+ }
+ }
+ finally
{
- _subscriptions.remove(subscriptionName);
+ _subscriberAccess.unlock();
}
Destination dest = consumer.getDestination();
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 41ad9f543e..75db5d5673 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -34,10 +34,7 @@ import java.util.TimerTask;
import java.util.UUID;
import javax.jms.Destination;
-import javax.jms.IllegalStateException;
import javax.jms.JMSException;
-import javax.jms.Topic;
-import javax.jms.TopicSubscriber;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQDestination.AddressOption;
@@ -916,72 +913,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}, _connection).execute();
}
- public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
- {
-
- checkNotClosed();
- AMQTopic origTopic=checkValidTopic(topic, true);
- AMQTopic dest=AMQTopic.createDurable010Topic(origTopic, name, _connection);
-
- TopicSubscriberAdaptor<BasicMessageConsumer_0_10> subscriber=_subscriptions.get(name);
- if (subscriber != null)
- {
- if (subscriber.getTopic().equals(topic))
- {
- throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription exchange "
- + name);
- }
- else
- {
- unsubscribe(name);
- }
- }
- else
- {
- AMQShortString topicName;
- if (topic instanceof AMQTopic)
- {
- topicName=((AMQTopic) topic).getBindingKeys()[0];
- }
- else
- {
- topicName=new AMQShortString(topic.getTopicName());
- }
-
- if (_strictAMQP)
- {
- if (_strictAMQPFATAL)
- {
- throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP.");
- }
- else
- {
- _logger.warn("Unable to determine if subscription already exists for '" + topicName + "' "
- + "for creation durableSubscriber. Requesting queue deletion regardless.");
- }
-
- deleteQueue(dest.getAMQQueueName());
- }
- else
- {
- // if the queue is bound to the exchange but NOT for this topic, then the JMS spec
- // says we must trash the subscription.
- if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName())
- && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName))
- {
- deleteQueue(dest.getAMQQueueName());
- }
- }
- }
-
- subscriber=new TopicSubscriberAdaptor(dest, createExclusiveConsumer(dest));
-
- _subscriptions.put(name, subscriber);
- _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
-
- return subscriber;
- }
-
protected Long requestQueueDepth(AMQDestination amqd)
{
return getQpidSession().queueQuery(amqd.getQueueName()).get().getMessageCount();
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index 939e29e4d5..89fbd66e71 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -21,31 +21,63 @@
package org.apache.qpid.client;
-import javax.jms.*;
-import javax.jms.IllegalStateException;
+import java.util.Map;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUndeliveredException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.failover.FailoverRetrySupport;
-import org.apache.qpid.client.message.*;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
+import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.message.MessageFactoryRegistry;
+import org.apache.qpid.client.message.ReturnMessage;
+import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.client.state.AMQState;
+import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicAckBody;
+import org.apache.qpid.framing.BasicConsumeBody;
+import org.apache.qpid.framing.BasicConsumeOkBody;
+import org.apache.qpid.framing.BasicQosBody;
+import org.apache.qpid.framing.BasicQosOkBody;
+import org.apache.qpid.framing.BasicRecoverBody;
+import org.apache.qpid.framing.BasicRecoverOkBody;
+import org.apache.qpid.framing.BasicRecoverSyncBody;
+import org.apache.qpid.framing.BasicRecoverSyncOkBody;
+import org.apache.qpid.framing.BasicRejectBody;
+import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.framing.ChannelFlowBody;
+import org.apache.qpid.framing.ChannelFlowOkBody;
+import org.apache.qpid.framing.ExchangeBoundOkBody;
+import org.apache.qpid.framing.ExchangeDeclareBody;
+import org.apache.qpid.framing.ExchangeDeclareOkBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.FieldTableFactory;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.framing.QueueBindOkBody;
+import org.apache.qpid.framing.QueueDeclareBody;
+import org.apache.qpid.framing.QueueDeclareOkBody;
+import org.apache.qpid.framing.QueueDeleteBody;
+import org.apache.qpid.framing.QueueDeleteOkBody;
+import org.apache.qpid.framing.TxCommitOkBody;
+import org.apache.qpid.framing.TxRollbackBody;
+import org.apache.qpid.framing.TxRollbackOkBody;
import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
+import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91;
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Map;
-
public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8>
{
@@ -442,74 +474,6 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
getProtocolHandler().syncWrite(frame, TxRollbackOkBody.class);
}
- public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
- {
-
- checkNotClosed();
- AMQTopic origTopic = checkValidTopic(topic, true);
- AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection);
- TopicSubscriberAdaptor<BasicMessageConsumer_0_8> subscriber = _subscriptions.get(name);
- if (subscriber != null)
- {
- if (subscriber.getTopic().equals(topic))
- {
- throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription exchange "
- + name);
- }
- else
- {
- unsubscribe(name);
- }
- }
- else
- {
- AMQShortString topicName;
- if (topic instanceof AMQTopic)
- {
- topicName = ((AMQTopic) topic).getRoutingKey();
- }
- else
- {
- topicName = new AMQShortString(topic.getTopicName());
- }
-
- if (_strictAMQP)
- {
- if (_strictAMQPFATAL)
- {
- throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP.");
- }
- else
- {
- _logger.warn("Unable to determine if subscription already exists for '" + topicName + "' "
- + "for creation durableSubscriber. Requesting queue deletion regardless.");
- }
-
- deleteQueue(dest.getAMQQueueName());
- }
- else
- {
- // if the queue is bound to the exchange but NOT for this topic, then the JMS spec
- // says we must trash the subscription.
- if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName())
- && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName))
- {
- deleteQueue(dest.getAMQQueueName());
- }
- }
- }
-
- subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
-
- _subscriptions.put(name, subscriber);
- _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
-
- return subscriber;
- }
-
-
-
-
public void setPrefetchLimits(final int messagePrefetch, final long sizePrefetch) throws AMQException
{
new FailoverRetrySupport<Object, AMQException>(
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
index 5f3f4e7d19..2704393cc2 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
@@ -46,11 +46,6 @@ public class AMQTopic extends AMQDestination implements Topic
super(binding);
}
-// public AMQTopic(String exchangeName, String routingKey)
-// {
-// this(new AMQShortString(exchangeName), new AMQShortString(routingKey));
-// }
-
public AMQTopic(AMQShortString exchange, AMQShortString routingKey, AMQShortString queueName)
{
super(exchange, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, routingKey, true, true, queueName, false);
@@ -79,8 +74,7 @@ public class AMQTopic extends AMQDestination implements Topic
public AMQTopic(AMQShortString exchangeName, AMQShortString name, boolean isAutoDelete, AMQShortString queueName, boolean isDurable)
{
- super(exchangeName, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, name, true, isAutoDelete,
- queueName, isDurable);
+ super(exchangeName, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, name, true, isAutoDelete, queueName, isDurable);
}
protected AMQTopic(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive,
@@ -103,13 +97,6 @@ public class AMQTopic extends AMQDestination implements Topic
true);
}
- public static AMQTopic createDurable010Topic(AMQTopic topic, String subscriptionName, AMQConnection connection)
- throws JMSException
- {
- return new AMQTopic(topic.getExchangeName(), ExchangeDefaults.TOPIC_EXCHANGE_CLASS, topic.getRoutingKey(), true, false,
- getDurableTopicQueueName(subscriptionName, connection), true);
- }
-
public static AMQShortString getDurableTopicQueueName(String subscriptionName, AMQConnection connection) throws JMSException
{
return new AMQShortString(connection.getClientID() + ":" + subscriptionName);
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java
index cbc2078571..abb0781536 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java
@@ -21,6 +21,9 @@
package org.apache.qpid.server.queue;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -35,14 +38,11 @@ import junit.framework.Assert;
import org.apache.log4j.Logger;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.Condition;
-
public class TimeToLiveTest extends QpidBrokerTestCase
{
private static final Logger _logger = Logger.getLogger(TimeToLiveTest.class);
@@ -253,7 +253,7 @@ public class TimeToLiveTest extends QpidBrokerTestCase
producerSession.commit();
//resubscribe
- durableSubscriber = clientSession.createDurableSubscriber(topic, getTestQueueName());
+ durableSubscriber = clientSession.createDurableSubscriber(topic, getTestQueueName(),"testprop='TimeToLiveTest'", false);
// Ensure we sleep the required amount of time.
ReentrantLock waitLock = new ReentrantLock();
diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java
index 3030572e13..989ac98747 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java
@@ -17,7 +17,17 @@
*/
package org.apache.qpid.test.unit.ct;
-import javax.jms.*;
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicConnectionFactory;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
@@ -75,7 +85,7 @@ public class DurableSubscriberTest extends QpidBrokerTestCase
}
catch (Exception e)
{
- System.out.println("problems shutting down arjuna-ms");
+ _logger.error("problems restarting broker: " + e);
throw e;
}
//now recreate the durable subscriber and check the received messages
@@ -102,7 +112,7 @@ public class DurableSubscriberTest extends QpidBrokerTestCase
* create and register a durable subscriber with a message selector and then close it
* crash the broker
* create a publisher and send 5 right messages and 5 wrong messages
- * recreate the durable subscriber and check the received the 5 expected messages
+ * recreate the durable subscriber and check we receive the 5 expected messages
*/
public void testDurSubRestoresMessageSelector() throws Exception
{
@@ -125,7 +135,7 @@ public class DurableSubscriberTest extends QpidBrokerTestCase
}
catch (Exception e)
{
- System.out.println("problems shutting down arjuna-ms");
+ _logger.error("problems restarting broker: " + e);
throw e;
}
topic = (Topic) getInitialContext().lookup(_topicName);
@@ -148,7 +158,7 @@ public class DurableSubscriberTest extends QpidBrokerTestCase
//now recreate the durable subscriber and check the received messages
TopicConnection durConnection2 = factory.createTopicConnection("guest", "guest");
TopicSession durSession2 = durConnection2.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
- TopicSubscriber durSub2 = durSession2.createDurableSubscriber(topic, "dursub");
+ TopicSubscriber durSub2 = durSession2.createDurableSubscriber(topic, "dursub", "testprop='true'", false);
durConnection2.start();
for (int i = 0; i < 5; i++)
{
@@ -385,6 +395,10 @@ public class DurableSubscriberTest extends QpidBrokerTestCase
assertEquals("Content was wrong",
"testResubscribeWithChangedSelectorAndRestart1",
((TextMessage) rMsg).getText());
+
+ // Queue has no messages left
+ AMQQueue subQueueTmp = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelectorAndRestart");
+ assertEquals("Msg count should be 0", 0, ((AMQSession<?, ?>) session).getQueueDepth(subQueueTmp));
rMsg = subA.receive(1000);
assertNull(rMsg);
@@ -403,14 +417,14 @@ public class DurableSubscriberTest extends QpidBrokerTestCase
// Reconnect with new selector that matches B
TopicSubscriber subB = session.createDurableSubscriber(topic,
- "testResubscribeWithChangedSelectorAndRestart","Match = False", false);
+ "testResubscribeWithChangedSelectorAndRestart",
+ "Match = false", false);
//verify no messages are now present on the queue as changing selector should have issued
//an unsubscribe and thus deleted the previous durable backing queue for the subscription.
- //check the dur sub's underlying queue now has msg count 1
- AMQQueue subQueue = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelector");
- assertEquals("Msg count should be 0", 0, ((AMQSession)session).getQueueDepth(subQueue));
-
+ //check the dur sub's underlying queue now has msg count 0
+ AMQQueue subQueue = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelectorAndRestart");
+ assertEquals("Msg count should be 0", 0, ((AMQSession<?, ?>) session).getQueueDepth(subQueue));
// Check that new messages are received properly
msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1");
@@ -429,6 +443,10 @@ public class DurableSubscriberTest extends QpidBrokerTestCase
rMsg = subB.receive(1000);
assertNull(rMsg);
+ //check the dur sub's underlying queue now has msg count 0
+ subQueue = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelectorAndRestart");
+ assertEquals("Msg count should be 0", 0, ((AMQSession<?, ?>) session).getQueueDepth(subQueue));
+
//now restart the server
try
{
@@ -440,28 +458,49 @@ public class DurableSubscriberTest extends QpidBrokerTestCase
throw e;
}
- // Check that new messages are still received properly
+ // Reconnect to broker
+ Connection connection = getConnectionFactory().createConnection("guest", "guest");
+ connection.start();
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ topic = new AMQTopic((AMQConnection) connection, "testResubscribeWithChangedSelectorAndRestart");
+ producer = session.createProducer(topic);
+
+ //verify no messages now present on the queue after we restart the broker
+ //check the dur sub's underlying queue now has msg count 0
+ subQueue = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelectorAndRestart");
+ assertEquals("Msg count should be 0", 0, ((AMQSession<?, ?>) session).getQueueDepth(subQueue));
+
+ // Reconnect with new selector that matches B
+ TopicSubscriber subC = session.createDurableSubscriber(topic,
+ "testResubscribeWithChangedSelectorAndRestart",
+ "Match = False", false);
+
+ // Check that new messages are still sent and recieved properly
msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1");
msg.setBooleanProperty("Match", true);
producer.send(msg);
msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2");
msg.setBooleanProperty("Match", false);
producer.send(msg);
+
+ //check the dur sub's underlying queue now has msg count 1
+ subQueue = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelectorAndRestart");
+ assertEquals("Msg count should be 1", 1, ((AMQSession<?, ?>) session).getQueueDepth(subQueue));
- rMsg = subB.receive(1000);
+ rMsg = subC.receive(1000);
assertNotNull(rMsg);
assertEquals("Content was wrong",
"testResubscribeWithChangedSelectorAndRestart2",
((TextMessage) rMsg).getText());
- rMsg = subB.receive(1000);
+ rMsg = subC.receive(1000);
assertNull(rMsg);
session.unsubscribe("testResubscribeWithChangedSelectorAndRestart");
- subB.close();
+
+ subC.close();
session.close();
- conn.close();
+ connection.close();
}
-
}
diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
index d1132c14fb..3dd3c72024 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
@@ -23,16 +23,6 @@ package org.apache.qpid.test.unit.topic;
import java.io.IOException;
import java.util.Set;
-import org.apache.qpid.management.common.JMXConnnectionFactory;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.AMQTopic;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import javax.jms.Connection;
import javax.jms.InvalidDestinationException;
import javax.jms.InvalidSelectorException;
@@ -48,6 +38,15 @@ import javax.management.MBeanServerConnection;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.management.common.JMXConnnectionFactory;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* @todo Code to check that a consumer gets only one particular method could be factored into a re-usable method (as
* a static on a base test helper class, e.g. TestUtils.
@@ -118,11 +117,11 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
_logger.info("Producer sending message A");
producer.send(session1.createTextMessage("A"));
- ((AMQSession)session1).sync();
+ ((AMQSession<?, ?>) session1).sync();
//check the dur sub's underlying queue now has msg count 1
AMQQueue subQueue = new AMQQueue("amq.topic", "clientid" + ":" + "MySubscription");
- assertEquals("Msg count should be 1", 1, ((AMQSession)session1).getQueueDepth(subQueue));
+ assertEquals("Msg count should be 1", 1, ((AMQSession<?, ?>) session1).getQueueDepth(subQueue));
Message msg;
_logger.info("Receive message on consumer 1:expecting A");
@@ -141,16 +140,16 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
_logger.info("Receive message on consumer 1 :expecting null");
assertEquals(null, msg);
- ((AMQSession)session2).sync();
+ ((AMQSession<?, ?>) session2).sync();
//check the dur sub's underlying queue now has msg count 0
- assertEquals("Msg count should be 0", 0, ((AMQSession)session2).getQueueDepth(subQueue));
+ assertEquals("Msg count should be 0", 0, ((AMQSession<?, ?>) session2).getQueueDepth(subQueue));
consumer2.close();
_logger.info("Unsubscribe session2/consumer2");
session2.unsubscribe("MySubscription");
- ((AMQSession)session2).sync();
+ ((AMQSession<?, ?>) session2).sync();
if(isJavaBroker() && isExternalBroker())
{
@@ -435,7 +434,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
con3.close();
}
- /***
+ /**
* This tests the fix for QPID-1085
* Creates a durable subscriber with an invalid selector, checks that the
* exception is thrown correctly and that the subscription is not created.
@@ -472,7 +471,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
session.unsubscribe("testDurableWithInvalidSelectorSub");
}
- /***
+ /**
* This tests the fix for QPID-1085
* Creates a durable subscriber with an invalid destination, checks that the
* exception is thrown correctly and that the subscription is not created.
@@ -509,9 +508,9 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
}
/**
- * Tests QPID-1202
* Creates a durable subscription with a selector, then changes that selector on resubscription
- * @throws Exception
+ * <p>
+ * QPID-1202, QPID-2418
*/
public void testResubscribeWithChangedSelector() throws Exception
{
@@ -544,8 +543,14 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
// Reconnect with new selector that matches B
TopicSubscriber subB = session.createDurableSubscriber(topic,
"testResubscribeWithChangedSelector","Match = False", false);
-
- // Check messages are received properly
+
+ //verify no messages are now present as changing selector should have issued
+ //an unsubscribe and thus deleted the previous backing queue for the subscription.
+ rMsg = subB.receive(NEGATIVE_RECEIVE_TIMEOUT);
+ assertNull("Should not have received message as the queue underlying the " +
+ "subscription should have been cleared/deleted when the selector was changed", rMsg);
+
+ // Check that new messages are received properly
sendMatchingAndNonMatchingMessage(session, producer);
rMsg = subB.receive(POSITIVE_RECEIVE_TIMEOUT);
@@ -594,9 +599,226 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
msg.setBooleanProperty("Match", false);
producer.send(msg);
}
-
- public static junit.framework.Test suite()
+
+
+ /**
+ * create and register a durable subscriber with a message selector and then close it
+ * create a publisher and send 5 right messages and 5 wrong messages
+ * create another durable subscriber with the same selector and name
+ * check messages are still there
+ * <p>
+ * QPID-2418
+ */
+ public void testDurSubSameMessageSelector() throws Exception
+ {
+ Connection conn = getConnection();
+ conn.start();
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ AMQTopic topic = new AMQTopic((AMQConnection) conn, "sameMessageSelector");
+
+ //create and register a durable subscriber with a message selector and then close it
+ TopicSubscriber subOne = session.createDurableSubscriber(topic, "sameMessageSelector", "testprop = TRUE", false);
+ subOne.close();
+
+ MessageProducer producer = session.createProducer(topic);
+ for (int i = 0; i < 5; i++)
+ {
+ Message message = session.createMessage();
+ message.setBooleanProperty("testprop", true);
+ producer.send(message);
+ message = session.createMessage();
+ message.setBooleanProperty("testprop", false);
+ producer.send(message);
+ }
+ producer.close();
+
+ // now recreate the durable subscriber and check the received messages
+ TopicSubscriber subTwo = session.createDurableSubscriber(topic, "sameMessageSelector", "testprop = TRUE", false);
+
+ // should be 5 messages on queue now
+ AMQQueue queue = new AMQQueue("amq.topic", "clientid" + ":" + "sameMessageSelector");
+ assertEquals("Queue depth is wrong", 5, ((AMQSession<?, ?>) session).getQueueDepth(queue));
+
+ for (int i = 0; i < 5; i++)
+ {
+ Message message = subTwo.receive(1000);
+ if (message == null)
+ {
+ fail("sameMessageSelector test failed. no message was returned");
+ }
+ else
+ {
+ assertEquals("sameMessageSelector test failed. message selector not reset",
+ "true", message.getStringProperty("testprop"));
+ }
+ }
+
+ // Check queue has no messages
+ assertEquals("Queue should be empty", 0, ((AMQSession<?, ?>) session).getQueueDepth(queue));
+
+ // Unsubscribe
+ session.unsubscribe("sameMessageSelector");
+
+ conn.close();
+ }
+
+ /**
+ * <ul>
+ * <li>create and register a durable subscriber with a message selector
+ * <li>create another durable subscriber with a different selector and same name
+ * <li>check first subscriber is now closed
+ * <li>create a publisher and send messages
+ * <li>check messages are recieved correctly
+ * </ul>
+ * <p>
+ * QPID-2418
+ */
+ public void testResubscribeWithChangedSelectorNoClose() throws Exception
{
- return new junit.framework.TestSuite(DurableSubscriptionTest.class);
- }
+ Connection conn = getConnection();
+ conn.start();
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ AMQTopic topic = new AMQTopic((AMQConnection) conn, "testResubscribeWithChangedSelectorNoClose");
+
+ // Create durable subscriber that matches A
+ TopicSubscriber subA = session.createDurableSubscriber(topic,
+ "testResubscribeWithChangedSelectorNoClose",
+ "Match = True", false);
+
+ // Reconnect with new selector that matches B
+ TopicSubscriber subB = session.createDurableSubscriber(topic,
+ "testResubscribeWithChangedSelectorNoClose",
+ "Match = false", false);
+
+ // First subscription has been closed
+ try
+ {
+ subA.receive(1000);
+ fail("First subscription was not closed");
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+
+ // Send 1 matching message and 1 non-matching message
+ MessageProducer producer = session.createProducer(topic);
+ TextMessage msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1");
+ msg.setBooleanProperty("Match", true);
+ producer.send(msg);
+ msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2");
+ msg.setBooleanProperty("Match", false);
+ producer.send(msg);
+
+ // should be 1 message on queue now
+ AMQQueue queue = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelectorNoClose");
+ assertEquals("Queue depth is wrong", 1, ((AMQSession<?, ?>) session).getQueueDepth(queue));
+
+ Message rMsg = subB.receive(1000);
+ assertNotNull(rMsg);
+ assertEquals("Content was wrong",
+ "testResubscribeWithChangedSelectorAndRestart2",
+ ((TextMessage) rMsg).getText());
+
+ rMsg = subB.receive(1000);
+ assertNull(rMsg);
+
+ // Check queue has no messages
+ assertEquals("Queue should be empty", 0, ((AMQSession<?, ?>) session).getQueueDepth(queue));
+
+ conn.close();
+ }
+
+ /**
+ * <ul>
+ * <li>create and register a durable subscriber with no message selector
+ * <li>create another durable subscriber with a selector and same name
+ * <li>check first subscriber is now closed
+ * <li>create a publisher and send messages
+ * <li>check messages are recieved correctly
+ * </ul>
+ * <p>
+ * QPID-2418
+ */
+ public void testDurSubAddMessageSelectorNoClose() throws Exception
+ {
+ Connection conn = getConnection();
+ conn.start();
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ AMQTopic topic = new AMQTopic((AMQConnection) conn, "subscriptionName");
+
+ // create and register a durable subscriber with no message selector
+ TopicSubscriber subOne = session.createDurableSubscriber(topic, "subscriptionName", null, false);
+
+ // now create a durable subscriber with a selector
+ TopicSubscriber subTwo = session.createDurableSubscriber(topic, "subscriptionName", "testprop = TRUE", false);
+
+ // First subscription has been closed
+ try
+ {
+ subOne.receive(1000);
+ fail("First subscription was not closed");
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+
+ // Send 1 matching message and 1 non-matching message
+ MessageProducer producer = session.createProducer(topic);
+ TextMessage msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1");
+ msg.setBooleanProperty("testprop", true);
+ producer.send(msg);
+ msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2");
+ msg.setBooleanProperty("testprop", false);
+ producer.send(msg);
+
+ // should be 1 message on queue now
+ AMQQueue queue = new AMQQueue("amq.topic", "clientid" + ":" + "subscriptionName");
+ assertEquals("Queue depth is wrong", 1, ((AMQSession<?, ?>) session).getQueueDepth(queue));
+
+ Message rMsg = subTwo.receive(1000);
+ assertNotNull(rMsg);
+ assertEquals("Content was wrong",
+ "testResubscribeWithChangedSelectorAndRestart1",
+ ((TextMessage) rMsg).getText());
+
+ rMsg = subTwo.receive(1000);
+ assertNull(rMsg);
+
+ // Check queue has no messages
+ assertEquals("Queue should be empty", 0, ((AMQSession<?, ?>) session).getQueueDepth(queue));
+
+ conn.close();
+ }
+
+ /**
+ * <ul>
+ * <li>create and register a durable subscriber with no message selector
+ * <li>try to create another durable with the same name, should fail
+ * </ul>
+ * <p>
+ * QPID-2418
+ */
+ public void testDurSubNoSelectorResubscribeNoClose() throws Exception
+ {
+ Connection conn = getConnection();
+ conn.start();
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ AMQTopic topic = new AMQTopic((AMQConnection) conn, "subscriptionName");
+
+ // create and register a durable subscriber with no message selector
+ session.createDurableSubscriber(topic, "subscriptionName", null, false);
+
+ // try to recreate the durable subscriber
+ try
+ {
+ session.createDurableSubscriber(topic, "subscriptionName", null, false);
+ fail("Subscription should not have been created");
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
}