summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2006-12-20 14:54:01 +0000
committerMartin Ritchie <ritchiem@apache.org>2006-12-20 14:54:01 +0000
commitfc6d79eb365027d1fdda43ae0081f72dd45b7896 (patch)
treeb8bd2f4f43faf9ce43438b7503e548e111f79512
parent7ec92c0a43be9e5934b35565a7f46eb83e36f6d1 (diff)
downloadqpid-python-fc6d79eb365027d1fdda43ae0081f72dd45b7896.tar.gz
QPID-101
Initial Implementation of Queue Browsing by Robert Godfrey and Martin Ritchie AMQChannel.java - record messages browsed so not to discard them on ack. FilterManagerFactory.java - Added a NoConsumerFilter ConcurrentSelectorDeliveryManager.java - Update to send browsers messages without taking the message from other consumers Subscription.java - Added autoClose and isBrowser methods SubscriptionTestHelper.java / RemoteSubscriptionImpl.java / SubscriptionImpl.java - implemented new interface methods Added NoConsumerFilter.java Patches from Rob Godfrey for client implmentation AMQSession.java - Added AUTO_CLOSE and NO_CONSUME properties to arguments FieldTable for consume method. BasicMessageConsumer.java - updates to correctly close consumer when an BasicCancel is received from the broker. AMQProtocolSession.java - method to allow cancellation of the client AMQStateManager.java - added handler for BasicCancelOkMethodHandler.java Added new AMQQueueBrowser.java BasicCancelOkMethodHandler.java git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@489106 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java29
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java9
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java48
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java48
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java153
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java128
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java70
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java90
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java35
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java8
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java1
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java15
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java15
14 files changed, 579 insertions, 75 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 3081181c80..c5b45659cf 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -46,6 +46,8 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import java.util.Set;
+import java.util.HashSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -110,6 +112,7 @@ public class AMQChannel
private TxAck ackOp;
private final List<AMQDataBlock> _returns = new LinkedList<AMQDataBlock>();
+ private Set<Long> _browsedAcks = new HashSet<Long>();
public AMQChannel(int channelId, MessageStore messageStore, MessageRouter exchanges)
throws AMQException
@@ -555,7 +558,14 @@ public class AMQChannel
for (UnacknowledgedMessage msg : acked)
{
- msg.discard();
+ if (!_browsedAcks.contains(deliveryTag))
+ {
+ msg.discard();
+ }
+ else
+ {
+ _browsedAcks.remove(deliveryTag);
+ }
}
}
@@ -572,7 +582,16 @@ public class AMQChannel
_log.trace("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channelId);
throw new AMQException("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channelId);
}
- msg.discard();
+
+ if (!_browsedAcks.contains(deliveryTag))
+ {
+ msg.discard();
+ }
+ else
+ {
+ _browsedAcks.remove(deliveryTag);
+ }
+
if (_log.isTraceEnabled())
{
_log.trace("Received non-multiple ack for messaging with delivery tag " + deliveryTag);
@@ -693,6 +712,12 @@ public class AMQChannel
_returns.clear();
}
+ public void addUnacknowledgedBrowsedMessage(AMQMessage msg, long deliveryTag, String consumerTag, AMQQueue queue)
+ {
+ _browsedAcks.add(deliveryTag);
+ addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
+ }
+
//we use this wrapper to ensure we are always using the correct
//map instance (its not final unfortunately)
private class AckMap implements UnacknowledgedMessageMap
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java b/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java
index 6ecd56586f..49f99132ef 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.filter;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.AMQException;
+import org.apache.qpid.common.AMQPFilterTypes;
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
@@ -34,7 +35,6 @@ public class FilterManagerFactory
private final static org.apache.log4j.Logger _logger = org.apache.log4j.Logger.getLogger(FilterManagerFactory.class);
//fixme move to a common class so it can be refered to from client code.
- private static String JMS_SELECTOR_FILTER = "x-filter-jms-selector";
public static FilterManager createManager(FieldTable filters) throws AMQException
{
@@ -51,7 +51,7 @@ public class FilterManagerFactory
{
String key = (String) it.next();
_logger.info("filter:" + key);
- if (key.equals(JMS_SELECTOR_FILTER))
+ if (key.equals(AMQPFilterTypes.JMS_SELECTOR.getValue()))
{
String selector = (String) filters.get(key);
@@ -61,6 +61,11 @@ public class FilterManagerFactory
}
}
+ if (key.equals(AMQPFilterTypes.NO_CONSUME.getValue()))
+ {
+ manager.add(new NoConsumerFilter());
+ }
+
}
//If we added no filters don't bear the overhead of having an filter manager
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java b/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java
new file mode 100644
index 0000000000..283d324ff6
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.filter;
+
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.filter.jms.selector.SelectorParser;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInvalidSelectorException;
+import org.apache.log4j.Logger;
+
+
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+
+public class NoConsumerFilter implements MessageFilter
+{
+ private final static Logger _logger = org.apache.log4j.Logger.getLogger(NoConsumerFilter.class);
+
+
+ public NoConsumerFilter() throws AMQException
+ {
+ _logger.info("Created NoConsumerFilter");
+ }
+
+ public boolean matches(AMQMessage message)
+ {
+ return true;
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index f99f2d78b7..2100734ada 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -164,7 +164,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
subscription.enqueueForPreDelivery(message);
}
- }
+ }
}
public synchronized void removeAMessageFromTop() throws AMQException
@@ -187,11 +187,11 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
- private AMQMessage getNextMessage(Queue<AMQMessage> messages)
+ private AMQMessage getNextMessage(Queue<AMQMessage> messages, Subscription sub)
{
AMQMessage message = messages.peek();
- while (message != null && message.taken())
+ while (message != null && (sub.isBrowser() || message.taken()))
{
//remove the already taken message
messages.poll();
@@ -201,12 +201,12 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
return message;
}
- public void sendNextMessage(Subscription sub, Queue<AMQMessage> messageQueue, AMQQueue queue)
+ public void sendNextMessage(Subscription sub, Queue<AMQMessage> messageQueue)
{
AMQMessage message = null;
try
{
- message = getNextMessage(messageQueue);
+ message = getNextMessage(messageQueue, sub);
// message will be null if we have no messages in the messageQueue.
if (message == null)
@@ -215,7 +215,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
_log.info("Async Delivery Message:" + message + " to :" + sub);
- sub.send(message, queue);
+ sub.send(message, _queue);
//remove sent message from our queue.
messageQueue.poll();
@@ -244,21 +244,33 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
if (!sub.isSuspended())
{
- if (sub.hasFilters())
- {
- sendNextMessage(sub, sub.getPreDeliveryQueue(), _queue);
- }
- else
- {
- sendNextMessage(sub, _messages, _queue);
- }
-
+ sendNextMessage(sub);
+
hasSubscribers = true;
}
}
}
}
+ private void sendNextMessage(Subscription sub)
+ {
+ if (sub.hasFilters())
+ {
+ sendNextMessage(sub, sub.getPreDeliveryQueue());
+ if (sub.isAutoClose())
+ {
+ if (sub.getPreDeliveryQueue().isEmpty())
+ {
+ sub.close();
+ }
+ }
+ }
+ else
+ {
+ sendNextMessage(sub, _messages);
+ }
+ }
+
private AMQMessage poll()
{
return _messages.poll();
@@ -359,9 +371,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
public void processAsync(Executor executor)
{
- _log.debug("Processing Async. Queued:" + hasQueuedMessages() + "(" + getQueueMessageCount() + ")" +
- " Active:" + _subscriptions.hasActiveSubscribers() +
- " Processing:" + _processing.get());
+ _log.info("Processing Async. Queued:" + hasQueuedMessages() + "(" + getQueueMessageCount() + ")" +
+ " Active:" + _subscriptions.hasActiveSubscribers() +
+ " Processing:" + _processing.get());
if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers())
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
index 523b5f06e9..a5672f2b19 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
@@ -40,4 +40,9 @@ public interface Subscription
void enqueueForPreDelivery(AMQMessage msg);
+ boolean isAutoClose();
+
+ void close();
+
+ boolean isBrowser();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index f4e7482396..4272541298 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -24,11 +24,13 @@ import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.common.ClientProperties;
+import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.BasicDeliverBody;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.BasicCancelOkBody;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
@@ -64,6 +66,9 @@ public class SubscriptionImpl implements Subscription
*/
private final boolean _acks;
private FilterManager _filters;
+ private final boolean _isBrowser;
+ private final Boolean _autoClose;
+ private boolean _closed = false;
public static class Factory implements SubscriptionFactory
{
@@ -105,9 +110,48 @@ public class SubscriptionImpl implements Subscription
_filters = FilterManagerFactory.createManager(filters);
+
+ if (_filters != null)
+ {
+ Object isBrowser = filters.get(AMQPFilterTypes.NO_CONSUME.getValue());
+ if (isBrowser != null)
+ {
+ _isBrowser = (Boolean) isBrowser;
+ }
+ else
+ {
+ _isBrowser = false;
+ }
+ }
+ else
+ {
+ _isBrowser = false;
+ }
+
+
+ if (_filters != null)
+ {
+ Object autoClose = filters.get(AMQPFilterTypes.AUTO_CLOSE.getValue());
+ if (autoClose != null)
+ {
+ _autoClose = (Boolean) autoClose;
+ }
+ else
+ {
+ _autoClose = false;
+ }
+ }
+ else
+ {
+ _autoClose = false;
+ }
+
+
if (_filters != null)
{
_messages = new ConcurrentLinkedQueueAtomicSize<AMQMessage>();
+
+
}
else
{
@@ -116,6 +160,7 @@ public class SubscriptionImpl implements Subscription
}
}
+
public SubscriptionImpl(int channel, AMQProtocolSession protocolSession,
String consumerTag)
throws AMQException
@@ -160,44 +205,78 @@ public class SubscriptionImpl implements Subscription
{
if (msg != null)
{
- try
+ if (_isBrowser)
+ {
+ sendToBrowser(msg, queue);
+ }
+ else
{
- // if we do not need to wait for client acknowledgements
- // we can decrement the reference count immediately.
+ sendToConsumer(msg, queue);
+ }
+ }
+ else
+ {
+ _logger.error("Attempt to send Null message", new NullPointerException());
+ }
+ }
- // By doing this _before_ the send we ensure that it
- // doesn't get sent if it can't be dequeued, preventing
- // duplicate delivery on recovery.
+ private void sendToBrowser(AMQMessage msg, AMQQueue queue) throws FailedDequeueException
+ {
+ // We don't decrement the reference here as we don't want to consume the message
+ // but we do want to send it to the client.
- // The send may of course still fail, in which case, as
- // the message is unacked, it will be lost.
- if (!_acks)
- {
- queue.dequeue(msg);
- }
- synchronized(channel)
- {
- long deliveryTag = channel.getNextDeliveryTag();
+ synchronized(channel)
+ {
+ long deliveryTag = channel.getNextDeliveryTag();
+
+ // We don't need to add the message to the unacknowledgedMap as we don't need to know if the client
+ // received the message. If it is lost in transit that is not important.
+ if (_acks)
+ {
+ channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag, consumerTag, queue);
+ }
+ ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName());
+ AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId());
- if (_acks)
- {
- channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
- }
+ protocolSession.writeFrame(frame);
+ }
+ }
- ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName());
- AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId());
+ private void sendToConsumer(AMQMessage msg, AMQQueue queue) throws FailedDequeueException
+ {
+ try
+ {
+ // if we do not need to wait for client acknowledgements
+ // we can decrement the reference count immediately.
- protocolSession.writeFrame(frame);
- }
+ // By doing this _before_ the send we ensure that it
+ // doesn't get sent if it can't be dequeued, preventing
+ // duplicate delivery on recovery.
+
+ // The send may of course still fail, in which case, as
+ // the message is unacked, it will be lost.
+ if (!_acks)
+ {
+ queue.dequeue(msg);
}
- finally
+ synchronized(channel)
{
- msg.setDeliveredToConsumer();
+ long deliveryTag = channel.getNextDeliveryTag();
+
+ if (_acks)
+ {
+ channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
+ }
+
+ ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName());
+ AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId());
+
+ protocolSession.writeFrame(frame);
}
}
- else
+ finally
{
- _logger.error("Attempt to send Null message", new NullPointerException());
+ msg.setDeliveredToConsumer();
}
}
@@ -290,6 +369,26 @@ public class SubscriptionImpl implements Subscription
}
}
+ public boolean isAutoClose()
+ {
+ return _autoClose;
+ }
+
+ public void close()
+ {
+ if (!_closed)
+ {
+ _logger.info("Closing autoclose subscription:" + this);
+ protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(), consumerTag));
+ _closed = true;
+ }
+ }
+
+ public boolean isBrowser()
+ {
+ return _isBrowser;
+ }
+
private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange)
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java b/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
new file mode 100644
index 0000000000..5c753946a6
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
@@ -0,0 +1,128 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import org.apache.log4j.Logger;
+
+import java.util.Enumeration;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.jms.*;
+import javax.jms.IllegalStateException;
+
+public class AMQQueueBrowser implements QueueBrowser
+{
+ private static final Logger _logger = Logger.getLogger(AMQQueueBrowser.class);
+
+
+ private AtomicBoolean _isClosed = new AtomicBoolean();
+ private final AMQSession _session;
+ private final AMQQueue _queue;
+ private final ArrayList<BasicMessageConsumer> _consumers = new ArrayList<BasicMessageConsumer>();
+ private final String _messageSelector;
+
+
+ AMQQueueBrowser(AMQSession session, AMQQueue queue, String messageSelector) throws JMSException
+ {
+ _session = session;
+ _queue = queue;
+ _messageSelector = (messageSelector == null) || (messageSelector.trim().length() == 0) ? null : messageSelector;
+ BasicMessageConsumer consumer = (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false);
+ consumer.close();
+ }
+
+ public Queue getQueue() throws JMSException
+ {
+ checkState();
+ return _queue;
+ }
+
+ private void checkState() throws JMSException
+ {
+ if (_isClosed.get())
+ {
+ throw new IllegalStateException("Queue Browser");
+ }
+ if (_session.isClosed())
+ {
+ throw new IllegalStateException("Session is closed");
+ }
+
+ }
+
+ public String getMessageSelector() throws JMSException
+ {
+
+ checkState();
+ return _messageSelector;
+ }
+
+ public Enumeration getEnumeration() throws JMSException
+ {
+ checkState();
+ final BasicMessageConsumer consumer = (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false);
+ _consumers.add(consumer);
+
+ return new Enumeration()
+ {
+
+
+ Message _nextMessage = consumer.receive();
+
+
+ public boolean hasMoreElements()
+ {
+ _logger.info("QB:hasMoreElements:" + (_nextMessage != null));
+ return (_nextMessage != null);
+ }
+
+ public Object nextElement()
+ {
+ Message msg = _nextMessage;
+ try
+ {
+ _logger.info("QB:nextElement about to receive");
+
+ _nextMessage = consumer.receive();
+ _logger.info("QB:nextElement received:" + _nextMessage);
+ }
+ catch (JMSException e)
+ {
+ _logger.warn("Exception caught while queue browsing", e);
+ _nextMessage = null;
+ }
+
+ return msg;
+ }
+ };
+ }
+
+ public void close() throws JMSException
+ {
+ for (BasicMessageConsumer consumer : _consumers)
+ {
+ consumer.close();
+ }
+ _consumers.clear();
+ }
+
+
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index bf61550cdc..2136d565f1 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -51,7 +51,6 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
public class AMQSession extends Closeable implements Session, QueueSession, TopicSession
{
@@ -146,6 +145,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private boolean _inRecovery;
+
/**
* Responsible for decoding a message fragment and passing it to the appropriate message consumer.
*/
@@ -843,7 +843,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
false,
false,
null,
- null);
+ null,
+ false,
+ false);
}
public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
@@ -855,7 +857,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
false,
false,
messageSelector,
- null);
+ null,
+ false,
+ false);
}
public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal)
@@ -868,7 +872,26 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
noLocal,
false,
messageSelector,
- null);
+ null,
+ false,
+ false);
+ }
+
+ public MessageConsumer createBrowserConsumer(Destination destination,
+ String messageSelector,
+ boolean noLocal)
+ throws JMSException
+ {
+ checkValidDestination(destination);
+ return createConsumerImpl(destination,
+ _defaultPrefetchHighMark,
+ _defaultPrefetchLowMark,
+ noLocal,
+ false,
+ messageSelector,
+ null,
+ true,
+ true);
}
public MessageConsumer createConsumer(Destination destination,
@@ -878,7 +901,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
String selector) throws JMSException
{
checkValidDestination(destination);
- return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, null);
+ return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, null, false, false);
}
@@ -890,7 +913,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
String selector) throws JMSException
{
checkValidDestination(destination);
- return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null);
+ return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null, false, false);
}
public MessageConsumer createConsumer(Destination destination,
@@ -902,7 +925,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
checkValidDestination(destination);
return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive,
- selector, rawSelector);
+ selector, rawSelector, false, false);
}
public MessageConsumer createConsumer(Destination destination,
@@ -915,7 +938,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
checkValidDestination(destination);
return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive,
- selector, rawSelector);
+ selector, rawSelector, false, false);
}
protected MessageConsumer createConsumerImpl(final Destination destination,
@@ -924,7 +947,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
final boolean noLocal,
final boolean exclusive,
final String selector,
- final FieldTable rawSelector) throws JMSException
+ final FieldTable rawSelector,
+ final boolean noConsume,
+ final boolean autoClose) throws JMSException
{
checkTemporaryDestination(destination);
@@ -948,7 +973,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
BasicMessageConsumer consumer = new BasicMessageConsumer(_channelId, _connection, amqd, selector, noLocal,
_messageFactoryRegistry, AMQSession.this,
protocolHandler, ft, prefetchHigh, prefetchLow, exclusive,
- _acknowledgeMode);
+ _acknowledgeMode, noConsume, autoClose);
try
{
@@ -1082,6 +1107,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
arguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector);
}
+ if(consumer.isAutoClose())
+ {
+ arguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE);
+ }
+ if(consumer.isNoConsume())
+ {
+ arguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE);
+ }
consumer.setConsumerTag(tag);
// we must register the consumer in the map before we actually start listening
@@ -1303,16 +1336,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public QueueBrowser createBrowser(Queue queue) throws JMSException
{
- checkNotClosed();
- checkValidQueue(queue);
- throw new UnsupportedOperationException("Queue browsing not supported");
+ return createBrowser(queue, null);
}
public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException
{
checkNotClosed();
checkValidQueue(queue);
- throw new UnsupportedOperationException("Queue browsing not supported");
+ return new AMQQueueBrowser(this, (AMQQueue) queue,messageSelector);
}
public TemporaryQueue createTemporaryQueue() throws JMSException
@@ -1586,6 +1617,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_connection.getProtocolHandler().writeFrame(channelFlowFrame);
}
+ public void confirmConsumerCancelled(String consumerTag)
+ {
+ BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag);
+ if((consumer != null) && (consumer.isAutoClose()))
+ {
+ consumer.closeWhenNoMessages(true);
+ }
+ }
+
+
/*
* I could have combined the last 3 methods, but this way it improves readability
*/
@@ -1616,4 +1657,5 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
throw new javax.jms.InvalidDestinationException("Invalid Queue");
}
}
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index f0d3cf5abc..673321cd9d 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -145,10 +145,19 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
*/
private Thread _receivingThread;
+ /**
+ * autoClose denotes that the consumer will automatically cancel itself when there are no more messages to receive
+ * on the queue. This is used for queue browsing.
+ */
+ private boolean _autoClose;
+ private boolean _closeWhenNoMessages;
+
+ private boolean _noConsume;
+
protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector,
- boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
- AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable,
- int prefetchHigh, int prefetchLow, boolean exclusive, int acknowledgeMode)
+ boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
+ AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable,
+ int prefetchHigh, int prefetchLow, boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
{
_channelId = channelId;
_connection = connection;
@@ -164,6 +173,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
_exclusive = exclusive;
_acknowledgeMode = acknowledgeMode;
_synchronousQueue = new ArrayBlockingQueue(prefetchHigh, true);
+ _autoClose = autoClose;
+ _noConsume = noConsume;
}
public AMQDestination getDestination()
@@ -321,6 +332,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
try
{
+ if(closeOnAutoClose())
+ {
+ return null;
+ }
Object o = null;
if (l > 0)
{
@@ -350,6 +365,19 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
}
+ private boolean closeOnAutoClose() throws JMSException
+ {
+ if(isAutoClose() && _closeWhenNoMessages && _synchronousQueue.isEmpty())
+ {
+ close(false);
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
public Message receiveNoWait() throws JMSException
{
checkPreConditions();
@@ -358,6 +386,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
try
{
+ if(closeOnAutoClose())
+ {
+ return null;
+ }
Object o = _synchronousQueue.poll();
final AbstractJMSMessage m = returnMessageOrThrow(o);
if (m != null)
@@ -402,22 +434,31 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
}
+
public void close() throws JMSException
{
+ close(true);
+ }
+
+ public void close(boolean sendClose) throws JMSException
+ {
synchronized(_connection.getFailoverMutex())
{
if (!_closed.getAndSet(true))
{
- final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId, _consumerTag, false);
-
- try
+ if(sendClose)
{
- _protocolHandler.syncWrite(cancelFrame, BasicCancelOkBody.class);
- }
- catch (AMQException e)
- {
- _logger.error("Error closing consumer: " + e, e);
- throw new JMSException("Error closing consumer: " + e);
+ final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId, _consumerTag, false);
+
+ try
+ {
+ _protocolHandler.syncWrite(cancelFrame, BasicCancelOkBody.class);
+ }
+ catch (AMQException e)
+ {
+ _logger.error("Error closing consumer: " + e, e);
+ throw new JMSException("Error closing consumer: " + e);
+ }
}
deregisterConsumer();
@@ -630,4 +671,29 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
_unacknowledgedDeliveryTags.clear();
}
+
+ public boolean isAutoClose()
+ {
+ return _autoClose;
+ }
+
+
+ public boolean isNoConsume()
+ {
+ return _noConsume;
+ }
+
+ public void closeWhenNoMessages(boolean b)
+ {
+ _closeWhenNoMessages = b;
+
+ if(_closeWhenNoMessages
+ && _synchronousQueue.isEmpty()
+ && _receiving.get()
+ && _messageListener != null)
+ {
+ _receivingThread.interrupt();
+ }
+
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java
new file mode 100644
index 0000000000..d855e97204
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java
@@ -0,0 +1,35 @@
+package org.apache.qpid.client.handler;
+
+import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.protocol.AMQMethodEvent;
+import org.apache.qpid.client.BasicMessageConsumer;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.ExchangeBoundOkBody;
+import org.apache.qpid.framing.BasicCancelOkBody;
+import org.apache.log4j.Logger;
+
+/**
+ * @author Apache Software Foundation
+ */
+public class BasicCancelOkMethodHandler implements StateAwareMethodListener
+{
+ private static final Logger _logger = Logger.getLogger(BasicCancelOkMethodHandler.class);
+ private static final BasicCancelOkMethodHandler _instance = new BasicCancelOkMethodHandler();
+
+ public static BasicCancelOkMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private BasicCancelOkMethodHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
+ {
+ _logger.debug("New BasicCancelOk method received");
+ BasicCancelOkBody body = (BasicCancelOkBody) evt.getMethod();
+ evt.getProtocolSession().confirmConsumerCancelled(evt.getChannelId(), body.consumerTag);
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index a4ed89719b..6a40fd3133 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -406,4 +406,12 @@ public class AMQProtocolSession implements ProtocolVersionList
HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay));
}
}
+
+ public void confirmConsumerCancelled(int channelId, String consumerTag)
+ {
+ final Integer chId = channelId;
+ final AMQSession session = (AMQSession) _channelId2SessionMap.get(chId);
+
+ session.confirmConsumerCancelled(consumerTag);
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
index 887850c06e..50bd1667f9 100644
--- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
+++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
@@ -103,6 +103,7 @@ public class AMQStateManager implements AMQMethodListener
frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());
frame2handlerMap.put(BasicDeliverBody.class, BasicDeliverMethodHandler.getInstance());
frame2handlerMap.put(BasicReturnBody.class, BasicReturnMethodHandler.getInstance());
+ frame2handlerMap.put(BasicCancelOkBody.class, BasicCancelOkMethodHandler.getInstance());
frame2handlerMap.put(ChannelFlowOkBody.class, ChannelFlowOkMethodHandler.getInstance());
frame2handlerMap.put(QueueDeleteOkBody.class, QueueDeleteOkMethodHandler.getInstance());
frame2handlerMap.put(ExchangeBoundOkBody.class, ExchangeBoundOkMethodHandler.getInstance());
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
index cc7f6ecd2a..c751e4a011 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
@@ -122,6 +122,21 @@ class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManage
//no-op -- if selectors are implemented on RemoteSubscriptions then look at SubscriptionImpl
}
+ public boolean isAutoClose()
+ {
+ return false;
+ }
+
+ public void close()
+ {
+ //no-op
+ }
+
+ public boolean isBrowser()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public void sendNextMessage(AMQQueue queue)
{
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
index 2de22f9084..fea3c93280 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
+++ b/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
@@ -91,6 +91,21 @@ public class SubscriptionTestHelper implements Subscription
//no-op
}
+ public boolean isAutoClose()
+ {
+ return false;
+ }
+
+ public void close()
+ {
+ //no-op
+ }
+
+ public boolean isBrowser()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public int hashCode()
{
return key.hashCode();