diff options
Diffstat (limited to 'java')
7 files changed, 548 insertions, 266 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 55b56444de..b96b32d990 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -20,8 +20,49 @@ */ package org.apache.qpid.client; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.io.Serializable; +import java.net.URISyntaxException; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import javax.jms.BytesMessage; +import javax.jms.Destination; +import javax.jms.IllegalStateException; +import javax.jms.InvalidDestinationException; +import javax.jms.InvalidSelectorException; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Queue; +import javax.jms.QueueBrowser; +import javax.jms.QueueReceiver; +import javax.jms.QueueSender; +import javax.jms.QueueSession; +import javax.jms.StreamMessage; +import javax.jms.TemporaryQueue; +import javax.jms.TemporaryTopic; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.jms.TopicPublisher; +import javax.jms.TopicSession; +import javax.jms.TopicSubscriber; +import javax.jms.TransactionRolledBackException; import org.apache.qpid.AMQChannelClosedException; import org.apache.qpid.AMQDisconnectedException; @@ -45,7 +86,6 @@ import org.apache.qpid.client.message.JMSStreamMessage; import org.apache.qpid.client.message.JMSTextMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage; -import org.apache.qpid.client.message.UnprocessedMessage_0_10; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.AMQStateManager; @@ -56,25 +96,8 @@ import org.apache.qpid.framing.FieldTableFactory; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.jms.Session; import org.apache.qpid.thread.Threading; -import org.apache.qpid.url.AMQBindingURL; - -import javax.jms.*; -import javax.jms.IllegalStateException; -import java.io.Serializable; -import java.net.URISyntaxException; -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * <p/><table id="crc"><caption>CRC Card</caption> @@ -172,7 +195,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } } - final AMQSession _thisSession = this; + final AMQSession<C, P> _thisSession = this; /** Used for debugging. */ private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class); @@ -261,15 +284,22 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic * keeps a record of subscriptions which have been created in the current instance. It does not remember * subscriptions between executions of the client. */ - protected final ConcurrentHashMap<String, TopicSubscriberAdaptor> _subscriptions = - new ConcurrentHashMap<String, TopicSubscriberAdaptor>(); + protected final ConcurrentHashMap<String, TopicSubscriberAdaptor<C>> _subscriptions = + new ConcurrentHashMap<String, TopicSubscriberAdaptor<C>>(); /** * Holds a mapping from message consumers to their identifying names, so that their subscriptions may be looked * up in the {@link #_subscriptions} map. */ - protected final ConcurrentHashMap<C, String> _reverseSubscriptionMap = - new ConcurrentHashMap<C, String>(); + protected final ConcurrentHashMap<C, String> _reverseSubscriptionMap = new ConcurrentHashMap<C, String>(); + + /** + * Locks to keep access to subscriber details atomic. + * <p> + * Added for QPID2418 + */ + protected final Lock _subscriberDetails = new ReentrantLock(true); + protected final Lock _subscriberAccess = new ReentrantLock(true); /** * Used to hold incoming messages. @@ -311,9 +341,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic */ protected final IdToConsumerMap<C> _consumers = new IdToConsumerMap<C>(); - //Map<AMQShortString, BasicMessageConsumer> _consumers = - //new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>(); - /** * Contains a list of consumers which have been removed but which might still have * messages to acknowledge, eg in client ack or transacted modes @@ -670,9 +697,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { if (_logger.isInfoEnabled()) { - StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); + // StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); _logger.info("Closing session: " + this); // + ":" - // + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1)); + // Arrays.asList(stackTrace).subList(3, stackTrace.length - 1)); } // Ensure we only try and close an open session. @@ -991,24 +1018,100 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic false); } - public abstract TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException; - - public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) + public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException + { + // Delegate the work to the {@link #createDurableSubscriber(Topic, String, String, boolean)} method + return createDurableSubscriber(topic, name, null, false); + } + + public TopicSubscriber createDurableSubscriber(Topic topic, String name, String selector, boolean noLocal) throws JMSException { checkNotClosed(); - checkValidTopic(topic, true); - if (_subscriptions.containsKey(name)) + AMQTopic origTopic = checkValidTopic(topic, true); + AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection); + + String messageSelector = ((selector == null) || (selector.trim().length() == 0)) ? null : selector; + + _subscriberDetails.lock(); + try { - _subscriptions.get(name).close(); - } - AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection); - C consumer = (C) createConsumer(dest, messageSelector, noLocal); - TopicSubscriberAdaptor<C> subscriber = new TopicSubscriberAdaptor(dest, consumer); - _subscriptions.put(name, subscriber); - _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name); + TopicSubscriberAdaptor<C> subscriber = _subscriptions.get(name); + + // Not subscribed to this name in the current session + if (subscriber == null) + { + AMQShortString topicName; + if (topic instanceof AMQTopic) + { + topicName = ((AMQTopic) topic).getRoutingKey(); + } else + { + topicName = new AMQShortString(topic.getTopicName()); + } + + if (_strictAMQP) + { + if (_strictAMQPFATAL) + { + throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP."); + } + else + { + _logger.warn("Unable to determine if subscription already exists for '" + topicName + + "' for creation durableSubscriber. Requesting queue deletion regardless."); + } + + deleteQueue(dest.getAMQQueueName()); + } + else + { + // if the queue is bound to the exchange but NOT for this topic, then the JMS spec + // says we must trash the subscription. + if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()) + && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName)) + { + deleteQueue(dest.getAMQQueueName()); + } + } + } + else + { + // Subscribed with the same topic and no current / previous or same selector + if (subscriber.getTopic().equals(topic) + && ((messageSelector == null && subscriber.getMessageSelector() == null) + || (messageSelector != null && messageSelector.equals(subscriber.getMessageSelector())))) + { + throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription name " + name + + (messageSelector != null ? " and selector " + messageSelector : "")); + } + else + { + unsubscribe(name, true); + } + } + + _subscriberAccess.lock(); + try + { + C consumer = (C) createConsumer(dest, messageSelector, noLocal); + subscriber = new TopicSubscriberAdaptor<C>(dest, consumer); - return subscriber; + // Save subscription information + _subscriptions.put(name, subscriber); + _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name); + } + finally + { + _subscriberAccess.unlock(); + } + + return subscriber; + } + finally + { + _subscriberDetails.unlock(); + } } public MapMessage createMapMessage() throws JMSException @@ -1732,23 +1835,51 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic // } } - - /*public void setTicket(int ticket) - { - _ticket = ticket; - }*/ - + + /** + * @see #unsubscribe(String, boolean) + */ public void unsubscribe(String name) throws JMSException { - checkNotClosed(); - TopicSubscriberAdaptor subscriber = _subscriptions.get(name); + unsubscribe(name, false); + } + + /** + * Unsubscribe from a subscription. + * + * @param name the name of the subscription to unsubscribe + * @param safe allows safe unsubscribe operation that will not throw an {@link InvalidDestinationException} if the + * queue is not bound, possibly due to the subscription being closed. + * @throws JMSException on + * @throws InvalidDestinationException + */ + private void unsubscribe(String name, boolean safe) throws JMSException + { + TopicSubscriberAdaptor<C> subscriber; + + _subscriberDetails.lock(); + try + { + checkNotClosed(); + subscriber = _subscriptions.get(name); + if (subscriber != null) + { + // Remove saved subscription information + _subscriptions.remove(name); + _reverseSubscriptionMap.remove(subscriber.getMessageConsumer()); + } + } + finally + { + _subscriberDetails.unlock(); + } + if (subscriber != null) { subscriber.close(); + // send a queue.delete for the subscription deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection)); - _subscriptions.remove(name); - _reverseSubscriptionMap.remove(subscriber); } else { @@ -1763,7 +1894,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic _logger.warn("Unable to determine if subscription already exists for '" + name + "' for unsubscribe." + " Requesting queue deletion regardless."); } - + deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection)); } else // Queue Browser @@ -1773,9 +1904,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection)); } - else + else if (!safe) { - throw new InvalidDestinationException("Unknown subscription exchange:" + name); + throw new InvalidDestinationException("Unknown subscription name: " + name); } } } @@ -1889,10 +2020,18 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { if (_consumers.remove(consumer.getConsumerTag()) != null) { - String subscriptionName = _reverseSubscriptionMap.remove(consumer); - if (subscriptionName != null) + _subscriberAccess.lock(); + try + { + String subscriptionName = _reverseSubscriptionMap.remove(consumer); + if (subscriptionName != null) + { + _subscriptions.remove(subscriptionName); + } + } + finally { - _subscriptions.remove(subscriptionName); + _subscriberAccess.unlock(); } Destination dest = consumer.getDestination(); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 41ad9f543e..75db5d5673 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -34,10 +34,7 @@ import java.util.TimerTask; import java.util.UUID; import javax.jms.Destination; -import javax.jms.IllegalStateException; import javax.jms.JMSException; -import javax.jms.Topic; -import javax.jms.TopicSubscriber; import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQDestination.AddressOption; @@ -916,72 +913,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic }, _connection).execute(); } - public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException - { - - checkNotClosed(); - AMQTopic origTopic=checkValidTopic(topic, true); - AMQTopic dest=AMQTopic.createDurable010Topic(origTopic, name, _connection); - - TopicSubscriberAdaptor<BasicMessageConsumer_0_10> subscriber=_subscriptions.get(name); - if (subscriber != null) - { - if (subscriber.getTopic().equals(topic)) - { - throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription exchange " - + name); - } - else - { - unsubscribe(name); - } - } - else - { - AMQShortString topicName; - if (topic instanceof AMQTopic) - { - topicName=((AMQTopic) topic).getBindingKeys()[0]; - } - else - { - topicName=new AMQShortString(topic.getTopicName()); - } - - if (_strictAMQP) - { - if (_strictAMQPFATAL) - { - throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP."); - } - else - { - _logger.warn("Unable to determine if subscription already exists for '" + topicName + "' " - + "for creation durableSubscriber. Requesting queue deletion regardless."); - } - - deleteQueue(dest.getAMQQueueName()); - } - else - { - // if the queue is bound to the exchange but NOT for this topic, then the JMS spec - // says we must trash the subscription. - if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()) - && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName)) - { - deleteQueue(dest.getAMQQueueName()); - } - } - } - - subscriber=new TopicSubscriberAdaptor(dest, createExclusiveConsumer(dest)); - - _subscriptions.put(name, subscriber); - _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name); - - return subscriber; - } - protected Long requestQueueDepth(AMQDestination amqd) { return getQpidSession().queueQuery(amqd.getQueueName()).get().getMessageCount(); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 939e29e4d5..89fbd66e71 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -21,31 +21,63 @@ package org.apache.qpid.client; -import javax.jms.*; -import javax.jms.IllegalStateException; +import java.util.Map; + +import javax.jms.Destination; +import javax.jms.JMSException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQUndeliveredException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.failover.FailoverRetrySupport; -import org.apache.qpid.client.message.*; import org.apache.qpid.client.message.AMQMessageDelegateFactory; +import org.apache.qpid.client.message.AbstractJMSMessage; +import org.apache.qpid.client.message.MessageFactoryRegistry; +import org.apache.qpid.client.message.ReturnMessage; +import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; import org.apache.qpid.client.state.AMQState; +import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; import org.apache.qpid.common.AMQPFilterTypes; -import org.apache.qpid.framing.*; -import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicAckBody; +import org.apache.qpid.framing.BasicConsumeBody; +import org.apache.qpid.framing.BasicConsumeOkBody; +import org.apache.qpid.framing.BasicQosBody; +import org.apache.qpid.framing.BasicQosOkBody; +import org.apache.qpid.framing.BasicRecoverBody; +import org.apache.qpid.framing.BasicRecoverOkBody; +import org.apache.qpid.framing.BasicRecoverSyncBody; +import org.apache.qpid.framing.BasicRecoverSyncOkBody; +import org.apache.qpid.framing.BasicRejectBody; +import org.apache.qpid.framing.ChannelCloseOkBody; +import org.apache.qpid.framing.ChannelFlowBody; +import org.apache.qpid.framing.ChannelFlowOkBody; +import org.apache.qpid.framing.ExchangeBoundOkBody; +import org.apache.qpid.framing.ExchangeDeclareBody; +import org.apache.qpid.framing.ExchangeDeclareOkBody; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.FieldTableFactory; +import org.apache.qpid.framing.ProtocolVersion; +import org.apache.qpid.framing.QueueBindOkBody; +import org.apache.qpid.framing.QueueDeclareBody; +import org.apache.qpid.framing.QueueDeclareOkBody; +import org.apache.qpid.framing.QueueDeleteBody; +import org.apache.qpid.framing.QueueDeleteOkBody; +import org.apache.qpid.framing.TxCommitOkBody; +import org.apache.qpid.framing.TxRollbackBody; +import org.apache.qpid.framing.TxRollbackOkBody; import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9; +import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91; import org.apache.qpid.jms.Session; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Map; - public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8> { @@ -442,74 +474,6 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B getProtocolHandler().syncWrite(frame, TxRollbackOkBody.class); } - public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException - { - - checkNotClosed(); - AMQTopic origTopic = checkValidTopic(topic, true); - AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection); - TopicSubscriberAdaptor<BasicMessageConsumer_0_8> subscriber = _subscriptions.get(name); - if (subscriber != null) - { - if (subscriber.getTopic().equals(topic)) - { - throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription exchange " - + name); - } - else - { - unsubscribe(name); - } - } - else - { - AMQShortString topicName; - if (topic instanceof AMQTopic) - { - topicName = ((AMQTopic) topic).getRoutingKey(); - } - else - { - topicName = new AMQShortString(topic.getTopicName()); - } - - if (_strictAMQP) - { - if (_strictAMQPFATAL) - { - throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP."); - } - else - { - _logger.warn("Unable to determine if subscription already exists for '" + topicName + "' " - + "for creation durableSubscriber. Requesting queue deletion regardless."); - } - - deleteQueue(dest.getAMQQueueName()); - } - else - { - // if the queue is bound to the exchange but NOT for this topic, then the JMS spec - // says we must trash the subscription. - if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()) - && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName)) - { - deleteQueue(dest.getAMQQueueName()); - } - } - } - - subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest)); - - _subscriptions.put(name, subscriber); - _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name); - - return subscriber; - } - - - - public void setPrefetchLimits(final int messagePrefetch, final long sizePrefetch) throws AMQException { new FailoverRetrySupport<Object, AMQException>( diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java index 5f3f4e7d19..2704393cc2 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java @@ -46,11 +46,6 @@ public class AMQTopic extends AMQDestination implements Topic super(binding); } -// public AMQTopic(String exchangeName, String routingKey) -// { -// this(new AMQShortString(exchangeName), new AMQShortString(routingKey)); -// } - public AMQTopic(AMQShortString exchange, AMQShortString routingKey, AMQShortString queueName) { super(exchange, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, routingKey, true, true, queueName, false); @@ -79,8 +74,7 @@ public class AMQTopic extends AMQDestination implements Topic public AMQTopic(AMQShortString exchangeName, AMQShortString name, boolean isAutoDelete, AMQShortString queueName, boolean isDurable) { - super(exchangeName, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, name, true, isAutoDelete, - queueName, isDurable); + super(exchangeName, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, name, true, isAutoDelete, queueName, isDurable); } protected AMQTopic(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive, @@ -103,13 +97,6 @@ public class AMQTopic extends AMQDestination implements Topic true); } - public static AMQTopic createDurable010Topic(AMQTopic topic, String subscriptionName, AMQConnection connection) - throws JMSException - { - return new AMQTopic(topic.getExchangeName(), ExchangeDefaults.TOPIC_EXCHANGE_CLASS, topic.getRoutingKey(), true, false, - getDurableTopicQueueName(subscriptionName, connection), true); - } - public static AMQShortString getDurableTopicQueueName(String subscriptionName, AMQConnection connection) throws JMSException { return new AMQShortString(connection.getClientID() + ":" + subscriptionName); diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java index cbc2078571..abb0781536 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java @@ -21,6 +21,9 @@ package org.apache.qpid.server.queue; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; @@ -35,14 +38,11 @@ import junit.framework.Assert; import org.apache.log4j.Logger; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQTopic; -import org.apache.qpid.client.AMQQueue; import org.apache.qpid.test.utils.QpidBrokerTestCase; -import java.util.concurrent.locks.ReentrantLock; -import java.util.concurrent.locks.Condition; - public class TimeToLiveTest extends QpidBrokerTestCase { private static final Logger _logger = Logger.getLogger(TimeToLiveTest.class); @@ -253,7 +253,7 @@ public class TimeToLiveTest extends QpidBrokerTestCase producerSession.commit(); //resubscribe - durableSubscriber = clientSession.createDurableSubscriber(topic, getTestQueueName()); + durableSubscriber = clientSession.createDurableSubscriber(topic, getTestQueueName(),"testprop='TimeToLiveTest'", false); // Ensure we sleep the required amount of time. ReentrantLock waitLock = new ReentrantLock(); diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java index 3030572e13..989ac98747 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java @@ -17,7 +17,17 @@ */ package org.apache.qpid.test.unit.ct; -import javax.jms.*; +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.jms.TopicConnection; +import javax.jms.TopicConnectionFactory; +import javax.jms.TopicPublisher; +import javax.jms.TopicSession; +import javax.jms.TopicSubscriber; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; @@ -75,7 +85,7 @@ public class DurableSubscriberTest extends QpidBrokerTestCase } catch (Exception e) { - System.out.println("problems shutting down arjuna-ms"); + _logger.error("problems restarting broker: " + e); throw e; } //now recreate the durable subscriber and check the received messages @@ -102,7 +112,7 @@ public class DurableSubscriberTest extends QpidBrokerTestCase * create and register a durable subscriber with a message selector and then close it * crash the broker * create a publisher and send 5 right messages and 5 wrong messages - * recreate the durable subscriber and check the received the 5 expected messages + * recreate the durable subscriber and check we receive the 5 expected messages */ public void testDurSubRestoresMessageSelector() throws Exception { @@ -125,7 +135,7 @@ public class DurableSubscriberTest extends QpidBrokerTestCase } catch (Exception e) { - System.out.println("problems shutting down arjuna-ms"); + _logger.error("problems restarting broker: " + e); throw e; } topic = (Topic) getInitialContext().lookup(_topicName); @@ -148,7 +158,7 @@ public class DurableSubscriberTest extends QpidBrokerTestCase //now recreate the durable subscriber and check the received messages TopicConnection durConnection2 = factory.createTopicConnection("guest", "guest"); TopicSession durSession2 = durConnection2.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); - TopicSubscriber durSub2 = durSession2.createDurableSubscriber(topic, "dursub"); + TopicSubscriber durSub2 = durSession2.createDurableSubscriber(topic, "dursub", "testprop='true'", false); durConnection2.start(); for (int i = 0; i < 5; i++) { @@ -385,6 +395,10 @@ public class DurableSubscriberTest extends QpidBrokerTestCase assertEquals("Content was wrong", "testResubscribeWithChangedSelectorAndRestart1", ((TextMessage) rMsg).getText()); + + // Queue has no messages left + AMQQueue subQueueTmp = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelectorAndRestart"); + assertEquals("Msg count should be 0", 0, ((AMQSession<?, ?>) session).getQueueDepth(subQueueTmp)); rMsg = subA.receive(1000); assertNull(rMsg); @@ -403,14 +417,14 @@ public class DurableSubscriberTest extends QpidBrokerTestCase // Reconnect with new selector that matches B TopicSubscriber subB = session.createDurableSubscriber(topic, - "testResubscribeWithChangedSelectorAndRestart","Match = False", false); + "testResubscribeWithChangedSelectorAndRestart", + "Match = false", false); //verify no messages are now present on the queue as changing selector should have issued //an unsubscribe and thus deleted the previous durable backing queue for the subscription. - //check the dur sub's underlying queue now has msg count 1 - AMQQueue subQueue = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelector"); - assertEquals("Msg count should be 0", 0, ((AMQSession)session).getQueueDepth(subQueue)); - + //check the dur sub's underlying queue now has msg count 0 + AMQQueue subQueue = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelectorAndRestart"); + assertEquals("Msg count should be 0", 0, ((AMQSession<?, ?>) session).getQueueDepth(subQueue)); // Check that new messages are received properly msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1"); @@ -429,6 +443,10 @@ public class DurableSubscriberTest extends QpidBrokerTestCase rMsg = subB.receive(1000); assertNull(rMsg); + //check the dur sub's underlying queue now has msg count 0 + subQueue = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelectorAndRestart"); + assertEquals("Msg count should be 0", 0, ((AMQSession<?, ?>) session).getQueueDepth(subQueue)); + //now restart the server try { @@ -440,28 +458,49 @@ public class DurableSubscriberTest extends QpidBrokerTestCase throw e; } - // Check that new messages are still received properly + // Reconnect to broker + Connection connection = getConnectionFactory().createConnection("guest", "guest"); + connection.start(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + topic = new AMQTopic((AMQConnection) connection, "testResubscribeWithChangedSelectorAndRestart"); + producer = session.createProducer(topic); + + //verify no messages now present on the queue after we restart the broker + //check the dur sub's underlying queue now has msg count 0 + subQueue = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelectorAndRestart"); + assertEquals("Msg count should be 0", 0, ((AMQSession<?, ?>) session).getQueueDepth(subQueue)); + + // Reconnect with new selector that matches B + TopicSubscriber subC = session.createDurableSubscriber(topic, + "testResubscribeWithChangedSelectorAndRestart", + "Match = False", false); + + // Check that new messages are still sent and recieved properly msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1"); msg.setBooleanProperty("Match", true); producer.send(msg); msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2"); msg.setBooleanProperty("Match", false); producer.send(msg); + + //check the dur sub's underlying queue now has msg count 1 + subQueue = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelectorAndRestart"); + assertEquals("Msg count should be 1", 1, ((AMQSession<?, ?>) session).getQueueDepth(subQueue)); - rMsg = subB.receive(1000); + rMsg = subC.receive(1000); assertNotNull(rMsg); assertEquals("Content was wrong", "testResubscribeWithChangedSelectorAndRestart2", ((TextMessage) rMsg).getText()); - rMsg = subB.receive(1000); + rMsg = subC.receive(1000); assertNull(rMsg); session.unsubscribe("testResubscribeWithChangedSelectorAndRestart"); - subB.close(); + + subC.close(); session.close(); - conn.close(); + connection.close(); } - } diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java index d1132c14fb..3dd3c72024 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java @@ -23,16 +23,6 @@ package org.apache.qpid.test.unit.topic; import java.io.IOException; import java.util.Set; -import org.apache.qpid.management.common.JMXConnnectionFactory; -import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.AMQTopic; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import javax.jms.Connection; import javax.jms.InvalidDestinationException; import javax.jms.InvalidSelectorException; @@ -48,6 +38,15 @@ import javax.management.MBeanServerConnection; import javax.management.ObjectName; import javax.management.remote.JMXConnector; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.management.common.JMXConnnectionFactory; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * @todo Code to check that a consumer gets only one particular method could be factored into a re-usable method (as * a static on a base test helper class, e.g. TestUtils. @@ -118,11 +117,11 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase _logger.info("Producer sending message A"); producer.send(session1.createTextMessage("A")); - ((AMQSession)session1).sync(); + ((AMQSession<?, ?>) session1).sync(); //check the dur sub's underlying queue now has msg count 1 AMQQueue subQueue = new AMQQueue("amq.topic", "clientid" + ":" + "MySubscription"); - assertEquals("Msg count should be 1", 1, ((AMQSession)session1).getQueueDepth(subQueue)); + assertEquals("Msg count should be 1", 1, ((AMQSession<?, ?>) session1).getQueueDepth(subQueue)); Message msg; _logger.info("Receive message on consumer 1:expecting A"); @@ -141,16 +140,16 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase _logger.info("Receive message on consumer 1 :expecting null"); assertEquals(null, msg); - ((AMQSession)session2).sync(); + ((AMQSession<?, ?>) session2).sync(); //check the dur sub's underlying queue now has msg count 0 - assertEquals("Msg count should be 0", 0, ((AMQSession)session2).getQueueDepth(subQueue)); + assertEquals("Msg count should be 0", 0, ((AMQSession<?, ?>) session2).getQueueDepth(subQueue)); consumer2.close(); _logger.info("Unsubscribe session2/consumer2"); session2.unsubscribe("MySubscription"); - ((AMQSession)session2).sync(); + ((AMQSession<?, ?>) session2).sync(); if(isJavaBroker() && isExternalBroker()) { @@ -435,7 +434,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase con3.close(); } - /*** + /** * This tests the fix for QPID-1085 * Creates a durable subscriber with an invalid selector, checks that the * exception is thrown correctly and that the subscription is not created. @@ -472,7 +471,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase session.unsubscribe("testDurableWithInvalidSelectorSub"); } - /*** + /** * This tests the fix for QPID-1085 * Creates a durable subscriber with an invalid destination, checks that the * exception is thrown correctly and that the subscription is not created. @@ -509,9 +508,9 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase } /** - * Tests QPID-1202 * Creates a durable subscription with a selector, then changes that selector on resubscription - * @throws Exception + * <p> + * QPID-1202, QPID-2418 */ public void testResubscribeWithChangedSelector() throws Exception { @@ -544,8 +543,14 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase // Reconnect with new selector that matches B TopicSubscriber subB = session.createDurableSubscriber(topic, "testResubscribeWithChangedSelector","Match = False", false); - - // Check messages are received properly + + //verify no messages are now present as changing selector should have issued + //an unsubscribe and thus deleted the previous backing queue for the subscription. + rMsg = subB.receive(NEGATIVE_RECEIVE_TIMEOUT); + assertNull("Should not have received message as the queue underlying the " + + "subscription should have been cleared/deleted when the selector was changed", rMsg); + + // Check that new messages are received properly sendMatchingAndNonMatchingMessage(session, producer); rMsg = subB.receive(POSITIVE_RECEIVE_TIMEOUT); @@ -594,9 +599,226 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase msg.setBooleanProperty("Match", false); producer.send(msg); } - - public static junit.framework.Test suite() + + + /** + * create and register a durable subscriber with a message selector and then close it + * create a publisher and send 5 right messages and 5 wrong messages + * create another durable subscriber with the same selector and name + * check messages are still there + * <p> + * QPID-2418 + */ + public void testDurSubSameMessageSelector() throws Exception + { + Connection conn = getConnection(); + conn.start(); + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + AMQTopic topic = new AMQTopic((AMQConnection) conn, "sameMessageSelector"); + + //create and register a durable subscriber with a message selector and then close it + TopicSubscriber subOne = session.createDurableSubscriber(topic, "sameMessageSelector", "testprop = TRUE", false); + subOne.close(); + + MessageProducer producer = session.createProducer(topic); + for (int i = 0; i < 5; i++) + { + Message message = session.createMessage(); + message.setBooleanProperty("testprop", true); + producer.send(message); + message = session.createMessage(); + message.setBooleanProperty("testprop", false); + producer.send(message); + } + producer.close(); + + // now recreate the durable subscriber and check the received messages + TopicSubscriber subTwo = session.createDurableSubscriber(topic, "sameMessageSelector", "testprop = TRUE", false); + + // should be 5 messages on queue now + AMQQueue queue = new AMQQueue("amq.topic", "clientid" + ":" + "sameMessageSelector"); + assertEquals("Queue depth is wrong", 5, ((AMQSession<?, ?>) session).getQueueDepth(queue)); + + for (int i = 0; i < 5; i++) + { + Message message = subTwo.receive(1000); + if (message == null) + { + fail("sameMessageSelector test failed. no message was returned"); + } + else + { + assertEquals("sameMessageSelector test failed. message selector not reset", + "true", message.getStringProperty("testprop")); + } + } + + // Check queue has no messages + assertEquals("Queue should be empty", 0, ((AMQSession<?, ?>) session).getQueueDepth(queue)); + + // Unsubscribe + session.unsubscribe("sameMessageSelector"); + + conn.close(); + } + + /** + * <ul> + * <li>create and register a durable subscriber with a message selector + * <li>create another durable subscriber with a different selector and same name + * <li>check first subscriber is now closed + * <li>create a publisher and send messages + * <li>check messages are recieved correctly + * </ul> + * <p> + * QPID-2418 + */ + public void testResubscribeWithChangedSelectorNoClose() throws Exception { - return new junit.framework.TestSuite(DurableSubscriptionTest.class); - } + Connection conn = getConnection(); + conn.start(); + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + AMQTopic topic = new AMQTopic((AMQConnection) conn, "testResubscribeWithChangedSelectorNoClose"); + + // Create durable subscriber that matches A + TopicSubscriber subA = session.createDurableSubscriber(topic, + "testResubscribeWithChangedSelectorNoClose", + "Match = True", false); + + // Reconnect with new selector that matches B + TopicSubscriber subB = session.createDurableSubscriber(topic, + "testResubscribeWithChangedSelectorNoClose", + "Match = false", false); + + // First subscription has been closed + try + { + subA.receive(1000); + fail("First subscription was not closed"); + } + catch (Exception e) + { + e.printStackTrace(); + } + + // Send 1 matching message and 1 non-matching message + MessageProducer producer = session.createProducer(topic); + TextMessage msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1"); + msg.setBooleanProperty("Match", true); + producer.send(msg); + msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2"); + msg.setBooleanProperty("Match", false); + producer.send(msg); + + // should be 1 message on queue now + AMQQueue queue = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelectorNoClose"); + assertEquals("Queue depth is wrong", 1, ((AMQSession<?, ?>) session).getQueueDepth(queue)); + + Message rMsg = subB.receive(1000); + assertNotNull(rMsg); + assertEquals("Content was wrong", + "testResubscribeWithChangedSelectorAndRestart2", + ((TextMessage) rMsg).getText()); + + rMsg = subB.receive(1000); + assertNull(rMsg); + + // Check queue has no messages + assertEquals("Queue should be empty", 0, ((AMQSession<?, ?>) session).getQueueDepth(queue)); + + conn.close(); + } + + /** + * <ul> + * <li>create and register a durable subscriber with no message selector + * <li>create another durable subscriber with a selector and same name + * <li>check first subscriber is now closed + * <li>create a publisher and send messages + * <li>check messages are recieved correctly + * </ul> + * <p> + * QPID-2418 + */ + public void testDurSubAddMessageSelectorNoClose() throws Exception + { + Connection conn = getConnection(); + conn.start(); + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + AMQTopic topic = new AMQTopic((AMQConnection) conn, "subscriptionName"); + + // create and register a durable subscriber with no message selector + TopicSubscriber subOne = session.createDurableSubscriber(topic, "subscriptionName", null, false); + + // now create a durable subscriber with a selector + TopicSubscriber subTwo = session.createDurableSubscriber(topic, "subscriptionName", "testprop = TRUE", false); + + // First subscription has been closed + try + { + subOne.receive(1000); + fail("First subscription was not closed"); + } + catch (Exception e) + { + e.printStackTrace(); + } + + // Send 1 matching message and 1 non-matching message + MessageProducer producer = session.createProducer(topic); + TextMessage msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1"); + msg.setBooleanProperty("testprop", true); + producer.send(msg); + msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2"); + msg.setBooleanProperty("testprop", false); + producer.send(msg); + + // should be 1 message on queue now + AMQQueue queue = new AMQQueue("amq.topic", "clientid" + ":" + "subscriptionName"); + assertEquals("Queue depth is wrong", 1, ((AMQSession<?, ?>) session).getQueueDepth(queue)); + + Message rMsg = subTwo.receive(1000); + assertNotNull(rMsg); + assertEquals("Content was wrong", + "testResubscribeWithChangedSelectorAndRestart1", + ((TextMessage) rMsg).getText()); + + rMsg = subTwo.receive(1000); + assertNull(rMsg); + + // Check queue has no messages + assertEquals("Queue should be empty", 0, ((AMQSession<?, ?>) session).getQueueDepth(queue)); + + conn.close(); + } + + /** + * <ul> + * <li>create and register a durable subscriber with no message selector + * <li>try to create another durable with the same name, should fail + * </ul> + * <p> + * QPID-2418 + */ + public void testDurSubNoSelectorResubscribeNoClose() throws Exception + { + Connection conn = getConnection(); + conn.start(); + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + AMQTopic topic = new AMQTopic((AMQConnection) conn, "subscriptionName"); + + // create and register a durable subscriber with no message selector + session.createDurableSubscriber(topic, "subscriptionName", null, false); + + // try to recreate the durable subscriber + try + { + session.createDurableSubscriber(topic, "subscriptionName", null, false); + fail("Subscription should not have been created"); + } + catch (Exception e) + { + e.printStackTrace(); + } + } } |