diff options
author | Martin Ritchie <ritchiem@apache.org> | 2006-12-20 14:54:01 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2006-12-20 14:54:01 +0000 |
commit | fc6d79eb365027d1fdda43ae0081f72dd45b7896 (patch) | |
tree | b8bd2f4f43faf9ce43438b7503e548e111f79512 | |
parent | 7ec92c0a43be9e5934b35565a7f46eb83e36f6d1 (diff) | |
download | qpid-python-fc6d79eb365027d1fdda43ae0081f72dd45b7896.tar.gz |
QPID-101
Initial Implementation of Queue Browsing by Robert Godfrey and Martin Ritchie
AMQChannel.java - record messages browsed so not to discard them on ack.
FilterManagerFactory.java - Added a NoConsumerFilter
ConcurrentSelectorDeliveryManager.java - Update to send browsers messages without taking the message from other consumers
Subscription.java - Added autoClose and isBrowser methods
SubscriptionTestHelper.java / RemoteSubscriptionImpl.java / SubscriptionImpl.java - implemented new interface methods
Added NoConsumerFilter.java
Patches from Rob Godfrey for client implmentation
AMQSession.java - Added AUTO_CLOSE and NO_CONSUME properties to arguments FieldTable for consume method.
BasicMessageConsumer.java - updates to correctly close consumer when an BasicCancel is received from the broker.
AMQProtocolSession.java - method to allow cancellation of the client
AMQStateManager.java - added handler for BasicCancelOkMethodHandler.java
Added new AMQQueueBrowser.java BasicCancelOkMethodHandler.java
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@489106 13f79535-47bb-0310-9956-ffa450edef68
14 files changed, 579 insertions, 75 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 3081181c80..c5b45659cf 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -46,6 +46,8 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.TreeMap; +import java.util.Set; +import java.util.HashSet; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -110,6 +112,7 @@ public class AMQChannel private TxAck ackOp; private final List<AMQDataBlock> _returns = new LinkedList<AMQDataBlock>(); + private Set<Long> _browsedAcks = new HashSet<Long>(); public AMQChannel(int channelId, MessageStore messageStore, MessageRouter exchanges) throws AMQException @@ -555,7 +558,14 @@ public class AMQChannel for (UnacknowledgedMessage msg : acked) { - msg.discard(); + if (!_browsedAcks.contains(deliveryTag)) + { + msg.discard(); + } + else + { + _browsedAcks.remove(deliveryTag); + } } } @@ -572,7 +582,16 @@ public class AMQChannel _log.trace("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channelId); throw new AMQException("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channelId); } - msg.discard(); + + if (!_browsedAcks.contains(deliveryTag)) + { + msg.discard(); + } + else + { + _browsedAcks.remove(deliveryTag); + } + if (_log.isTraceEnabled()) { _log.trace("Received non-multiple ack for messaging with delivery tag " + deliveryTag); @@ -693,6 +712,12 @@ public class AMQChannel _returns.clear(); } + public void addUnacknowledgedBrowsedMessage(AMQMessage msg, long deliveryTag, String consumerTag, AMQQueue queue) + { + _browsedAcks.add(deliveryTag); + addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue); + } + //we use this wrapper to ensure we are always using the correct //map instance (its not final unfortunately) private class AckMap implements UnacknowledgedMessageMap diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java b/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java index 6ecd56586f..49f99132ef 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.filter; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.AMQException; +import org.apache.qpid.common.AMQPFilterTypes; //import org.slf4j.Logger; //import org.slf4j.LoggerFactory; @@ -34,7 +35,6 @@ public class FilterManagerFactory private final static org.apache.log4j.Logger _logger = org.apache.log4j.Logger.getLogger(FilterManagerFactory.class); //fixme move to a common class so it can be refered to from client code. - private static String JMS_SELECTOR_FILTER = "x-filter-jms-selector"; public static FilterManager createManager(FieldTable filters) throws AMQException { @@ -51,7 +51,7 @@ public class FilterManagerFactory { String key = (String) it.next(); _logger.info("filter:" + key); - if (key.equals(JMS_SELECTOR_FILTER)) + if (key.equals(AMQPFilterTypes.JMS_SELECTOR.getValue())) { String selector = (String) filters.get(key); @@ -61,6 +61,11 @@ public class FilterManagerFactory } } + if (key.equals(AMQPFilterTypes.NO_CONSUME.getValue())) + { + manager.add(new NoConsumerFilter()); + } + } //If we added no filters don't bear the overhead of having an filter manager diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java b/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java new file mode 100644 index 0000000000..283d324ff6 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.filter; + +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.filter.jms.selector.SelectorParser; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQInvalidSelectorException; +import org.apache.log4j.Logger; + + +import javax.jms.InvalidSelectorException; +import javax.jms.JMSException; + +public class NoConsumerFilter implements MessageFilter +{ + private final static Logger _logger = org.apache.log4j.Logger.getLogger(NoConsumerFilter.class); + + + public NoConsumerFilter() throws AMQException + { + _logger.info("Created NoConsumerFilter"); + } + + public boolean matches(AMQMessage message) + { + return true; + } + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index f99f2d78b7..2100734ada 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -164,7 +164,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { subscription.enqueueForPreDelivery(message); } - } + } } public synchronized void removeAMessageFromTop() throws AMQException @@ -187,11 +187,11 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } - private AMQMessage getNextMessage(Queue<AMQMessage> messages) + private AMQMessage getNextMessage(Queue<AMQMessage> messages, Subscription sub) { AMQMessage message = messages.peek(); - while (message != null && message.taken()) + while (message != null && (sub.isBrowser() || message.taken())) { //remove the already taken message messages.poll(); @@ -201,12 +201,12 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager return message; } - public void sendNextMessage(Subscription sub, Queue<AMQMessage> messageQueue, AMQQueue queue) + public void sendNextMessage(Subscription sub, Queue<AMQMessage> messageQueue) { AMQMessage message = null; try { - message = getNextMessage(messageQueue); + message = getNextMessage(messageQueue, sub); // message will be null if we have no messages in the messageQueue. if (message == null) @@ -215,7 +215,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } _log.info("Async Delivery Message:" + message + " to :" + sub); - sub.send(message, queue); + sub.send(message, _queue); //remove sent message from our queue. messageQueue.poll(); @@ -244,21 +244,33 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { if (!sub.isSuspended()) { - if (sub.hasFilters()) - { - sendNextMessage(sub, sub.getPreDeliveryQueue(), _queue); - } - else - { - sendNextMessage(sub, _messages, _queue); - } - + sendNextMessage(sub); + hasSubscribers = true; } } } } + private void sendNextMessage(Subscription sub) + { + if (sub.hasFilters()) + { + sendNextMessage(sub, sub.getPreDeliveryQueue()); + if (sub.isAutoClose()) + { + if (sub.getPreDeliveryQueue().isEmpty()) + { + sub.close(); + } + } + } + else + { + sendNextMessage(sub, _messages); + } + } + private AMQMessage poll() { return _messages.poll(); @@ -359,9 +371,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager public void processAsync(Executor executor) { - _log.debug("Processing Async. Queued:" + hasQueuedMessages() + "(" + getQueueMessageCount() + ")" + - " Active:" + _subscriptions.hasActiveSubscribers() + - " Processing:" + _processing.get()); + _log.info("Processing Async. Queued:" + hasQueuedMessages() + "(" + getQueueMessageCount() + ")" + + " Active:" + _subscriptions.hasActiveSubscribers() + + " Processing:" + _processing.get()); if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers()) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java index 523b5f06e9..a5672f2b19 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java @@ -40,4 +40,9 @@ public interface Subscription void enqueueForPreDelivery(AMQMessage msg); + boolean isAutoClose(); + + void close(); + + boolean isBrowser(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index f4e7482396..4272541298 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -24,11 +24,13 @@ import org.apache.log4j.Logger; import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.common.ClientProperties; +import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize; import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.BasicDeliverBody; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.BasicCancelOkBody; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.FilterManagerFactory; @@ -64,6 +66,9 @@ public class SubscriptionImpl implements Subscription */ private final boolean _acks; private FilterManager _filters; + private final boolean _isBrowser; + private final Boolean _autoClose; + private boolean _closed = false; public static class Factory implements SubscriptionFactory { @@ -105,9 +110,48 @@ public class SubscriptionImpl implements Subscription _filters = FilterManagerFactory.createManager(filters); + + if (_filters != null) + { + Object isBrowser = filters.get(AMQPFilterTypes.NO_CONSUME.getValue()); + if (isBrowser != null) + { + _isBrowser = (Boolean) isBrowser; + } + else + { + _isBrowser = false; + } + } + else + { + _isBrowser = false; + } + + + if (_filters != null) + { + Object autoClose = filters.get(AMQPFilterTypes.AUTO_CLOSE.getValue()); + if (autoClose != null) + { + _autoClose = (Boolean) autoClose; + } + else + { + _autoClose = false; + } + } + else + { + _autoClose = false; + } + + if (_filters != null) { _messages = new ConcurrentLinkedQueueAtomicSize<AMQMessage>(); + + } else { @@ -116,6 +160,7 @@ public class SubscriptionImpl implements Subscription } } + public SubscriptionImpl(int channel, AMQProtocolSession protocolSession, String consumerTag) throws AMQException @@ -160,44 +205,78 @@ public class SubscriptionImpl implements Subscription { if (msg != null) { - try + if (_isBrowser) + { + sendToBrowser(msg, queue); + } + else { - // if we do not need to wait for client acknowledgements - // we can decrement the reference count immediately. + sendToConsumer(msg, queue); + } + } + else + { + _logger.error("Attempt to send Null message", new NullPointerException()); + } + } - // By doing this _before_ the send we ensure that it - // doesn't get sent if it can't be dequeued, preventing - // duplicate delivery on recovery. + private void sendToBrowser(AMQMessage msg, AMQQueue queue) throws FailedDequeueException + { + // We don't decrement the reference here as we don't want to consume the message + // but we do want to send it to the client. - // The send may of course still fail, in which case, as - // the message is unacked, it will be lost. - if (!_acks) - { - queue.dequeue(msg); - } - synchronized(channel) - { - long deliveryTag = channel.getNextDeliveryTag(); + synchronized(channel) + { + long deliveryTag = channel.getNextDeliveryTag(); + + // We don't need to add the message to the unacknowledgedMap as we don't need to know if the client + // received the message. If it is lost in transit that is not important. + if (_acks) + { + channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag, consumerTag, queue); + } + ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName()); + AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId()); - if (_acks) - { - channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue); - } + protocolSession.writeFrame(frame); + } + } - ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName()); - AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId()); + private void sendToConsumer(AMQMessage msg, AMQQueue queue) throws FailedDequeueException + { + try + { + // if we do not need to wait for client acknowledgements + // we can decrement the reference count immediately. - protocolSession.writeFrame(frame); - } + // By doing this _before_ the send we ensure that it + // doesn't get sent if it can't be dequeued, preventing + // duplicate delivery on recovery. + + // The send may of course still fail, in which case, as + // the message is unacked, it will be lost. + if (!_acks) + { + queue.dequeue(msg); } - finally + synchronized(channel) { - msg.setDeliveredToConsumer(); + long deliveryTag = channel.getNextDeliveryTag(); + + if (_acks) + { + channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue); + } + + ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName()); + AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId()); + + protocolSession.writeFrame(frame); } } - else + finally { - _logger.error("Attempt to send Null message", new NullPointerException()); + msg.setDeliveredToConsumer(); } } @@ -290,6 +369,26 @@ public class SubscriptionImpl implements Subscription } } + public boolean isAutoClose() + { + return _autoClose; + } + + public void close() + { + if (!_closed) + { + _logger.info("Closing autoclose subscription:" + this); + protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(), consumerTag)); + _closed = true; + } + } + + public boolean isBrowser() + { + return _isBrowser; + } + private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange) { diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java b/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java new file mode 100644 index 0000000000..5c753946a6 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java @@ -0,0 +1,128 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client; + +import org.apache.log4j.Logger; + +import java.util.Enumeration; +import java.util.ArrayList; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.jms.*; +import javax.jms.IllegalStateException; + +public class AMQQueueBrowser implements QueueBrowser +{ + private static final Logger _logger = Logger.getLogger(AMQQueueBrowser.class); + + + private AtomicBoolean _isClosed = new AtomicBoolean(); + private final AMQSession _session; + private final AMQQueue _queue; + private final ArrayList<BasicMessageConsumer> _consumers = new ArrayList<BasicMessageConsumer>(); + private final String _messageSelector; + + + AMQQueueBrowser(AMQSession session, AMQQueue queue, String messageSelector) throws JMSException + { + _session = session; + _queue = queue; + _messageSelector = (messageSelector == null) || (messageSelector.trim().length() == 0) ? null : messageSelector; + BasicMessageConsumer consumer = (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false); + consumer.close(); + } + + public Queue getQueue() throws JMSException + { + checkState(); + return _queue; + } + + private void checkState() throws JMSException + { + if (_isClosed.get()) + { + throw new IllegalStateException("Queue Browser"); + } + if (_session.isClosed()) + { + throw new IllegalStateException("Session is closed"); + } + + } + + public String getMessageSelector() throws JMSException + { + + checkState(); + return _messageSelector; + } + + public Enumeration getEnumeration() throws JMSException + { + checkState(); + final BasicMessageConsumer consumer = (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false); + _consumers.add(consumer); + + return new Enumeration() + { + + + Message _nextMessage = consumer.receive(); + + + public boolean hasMoreElements() + { + _logger.info("QB:hasMoreElements:" + (_nextMessage != null)); + return (_nextMessage != null); + } + + public Object nextElement() + { + Message msg = _nextMessage; + try + { + _logger.info("QB:nextElement about to receive"); + + _nextMessage = consumer.receive(); + _logger.info("QB:nextElement received:" + _nextMessage); + } + catch (JMSException e) + { + _logger.warn("Exception caught while queue browsing", e); + _nextMessage = null; + } + + return msg; + } + }; + } + + public void close() throws JMSException + { + for (BasicMessageConsumer consumer : _consumers) + { + consumer.close(); + } + _consumers.clear(); + } + + +} diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index bf61550cdc..2136d565f1 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -51,7 +51,6 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; public class AMQSession extends Closeable implements Session, QueueSession, TopicSession { @@ -146,6 +145,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private boolean _inRecovery; + /** * Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ @@ -843,7 +843,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi false, false, null, - null); + null, + false, + false); } public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException @@ -855,7 +857,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi false, false, messageSelector, - null); + null, + false, + false); } public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) @@ -868,7 +872,26 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi noLocal, false, messageSelector, - null); + null, + false, + false); + } + + public MessageConsumer createBrowserConsumer(Destination destination, + String messageSelector, + boolean noLocal) + throws JMSException + { + checkValidDestination(destination); + return createConsumerImpl(destination, + _defaultPrefetchHighMark, + _defaultPrefetchLowMark, + noLocal, + false, + messageSelector, + null, + true, + true); } public MessageConsumer createConsumer(Destination destination, @@ -878,7 +901,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi String selector) throws JMSException { checkValidDestination(destination); - return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, null); + return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, null, false, false); } @@ -890,7 +913,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi String selector) throws JMSException { checkValidDestination(destination); - return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null); + return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null, false, false); } public MessageConsumer createConsumer(Destination destination, @@ -902,7 +925,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { checkValidDestination(destination); return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, - selector, rawSelector); + selector, rawSelector, false, false); } public MessageConsumer createConsumer(Destination destination, @@ -915,7 +938,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { checkValidDestination(destination); return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, - selector, rawSelector); + selector, rawSelector, false, false); } protected MessageConsumer createConsumerImpl(final Destination destination, @@ -924,7 +947,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi final boolean noLocal, final boolean exclusive, final String selector, - final FieldTable rawSelector) throws JMSException + final FieldTable rawSelector, + final boolean noConsume, + final boolean autoClose) throws JMSException { checkTemporaryDestination(destination); @@ -948,7 +973,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi BasicMessageConsumer consumer = new BasicMessageConsumer(_channelId, _connection, amqd, selector, noLocal, _messageFactoryRegistry, AMQSession.this, protocolHandler, ft, prefetchHigh, prefetchLow, exclusive, - _acknowledgeMode); + _acknowledgeMode, noConsume, autoClose); try { @@ -1082,6 +1107,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { arguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector); } + if(consumer.isAutoClose()) + { + arguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE); + } + if(consumer.isNoConsume()) + { + arguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE); + } consumer.setConsumerTag(tag); // we must register the consumer in the map before we actually start listening @@ -1303,16 +1336,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public QueueBrowser createBrowser(Queue queue) throws JMSException { - checkNotClosed(); - checkValidQueue(queue); - throw new UnsupportedOperationException("Queue browsing not supported"); + return createBrowser(queue, null); } public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException { checkNotClosed(); checkValidQueue(queue); - throw new UnsupportedOperationException("Queue browsing not supported"); + return new AMQQueueBrowser(this, (AMQQueue) queue,messageSelector); } public TemporaryQueue createTemporaryQueue() throws JMSException @@ -1586,6 +1617,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _connection.getProtocolHandler().writeFrame(channelFlowFrame); } + public void confirmConsumerCancelled(String consumerTag) + { + BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag); + if((consumer != null) && (consumer.isAutoClose())) + { + consumer.closeWhenNoMessages(true); + } + } + + /* * I could have combined the last 3 methods, but this way it improves readability */ @@ -1616,4 +1657,5 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi throw new javax.jms.InvalidDestinationException("Invalid Queue"); } } + } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index f0d3cf5abc..673321cd9d 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -145,10 +145,19 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer */ private Thread _receivingThread; + /** + * autoClose denotes that the consumer will automatically cancel itself when there are no more messages to receive + * on the queue. This is used for queue browsing. + */ + private boolean _autoClose; + private boolean _closeWhenNoMessages; + + private boolean _noConsume; + protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, - boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, - AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, - int prefetchHigh, int prefetchLow, boolean exclusive, int acknowledgeMode) + boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, + AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, + int prefetchHigh, int prefetchLow, boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) { _channelId = channelId; _connection = connection; @@ -164,6 +173,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer _exclusive = exclusive; _acknowledgeMode = acknowledgeMode; _synchronousQueue = new ArrayBlockingQueue(prefetchHigh, true); + _autoClose = autoClose; + _noConsume = noConsume; } public AMQDestination getDestination() @@ -321,6 +332,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer try { + if(closeOnAutoClose()) + { + return null; + } Object o = null; if (l > 0) { @@ -350,6 +365,19 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } + private boolean closeOnAutoClose() throws JMSException + { + if(isAutoClose() && _closeWhenNoMessages && _synchronousQueue.isEmpty()) + { + close(false); + return true; + } + else + { + return false; + } + } + public Message receiveNoWait() throws JMSException { checkPreConditions(); @@ -358,6 +386,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer try { + if(closeOnAutoClose()) + { + return null; + } Object o = _synchronousQueue.poll(); final AbstractJMSMessage m = returnMessageOrThrow(o); if (m != null) @@ -402,22 +434,31 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } + public void close() throws JMSException { + close(true); + } + + public void close(boolean sendClose) throws JMSException + { synchronized(_connection.getFailoverMutex()) { if (!_closed.getAndSet(true)) { - final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId, _consumerTag, false); - - try + if(sendClose) { - _protocolHandler.syncWrite(cancelFrame, BasicCancelOkBody.class); - } - catch (AMQException e) - { - _logger.error("Error closing consumer: " + e, e); - throw new JMSException("Error closing consumer: " + e); + final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId, _consumerTag, false); + + try + { + _protocolHandler.syncWrite(cancelFrame, BasicCancelOkBody.class); + } + catch (AMQException e) + { + _logger.error("Error closing consumer: " + e, e); + throw new JMSException("Error closing consumer: " + e); + } } deregisterConsumer(); @@ -630,4 +671,29 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { _unacknowledgedDeliveryTags.clear(); } + + public boolean isAutoClose() + { + return _autoClose; + } + + + public boolean isNoConsume() + { + return _noConsume; + } + + public void closeWhenNoMessages(boolean b) + { + _closeWhenNoMessages = b; + + if(_closeWhenNoMessages + && _synchronousQueue.isEmpty() + && _receiving.get() + && _messageListener != null) + { + _receivingThread.interrupt(); + } + + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java new file mode 100644 index 0000000000..d855e97204 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java @@ -0,0 +1,35 @@ +package org.apache.qpid.client.handler; + +import org.apache.qpid.client.state.StateAwareMethodListener; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.protocol.AMQMethodEvent; +import org.apache.qpid.client.BasicMessageConsumer; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.ExchangeBoundOkBody; +import org.apache.qpid.framing.BasicCancelOkBody; +import org.apache.log4j.Logger; + +/** + * @author Apache Software Foundation + */ +public class BasicCancelOkMethodHandler implements StateAwareMethodListener +{ + private static final Logger _logger = Logger.getLogger(BasicCancelOkMethodHandler.class); + private static final BasicCancelOkMethodHandler _instance = new BasicCancelOkMethodHandler(); + + public static BasicCancelOkMethodHandler getInstance() + { + return _instance; + } + + private BasicCancelOkMethodHandler() + { + } + + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException + { + _logger.debug("New BasicCancelOk method received"); + BasicCancelOkBody body = (BasicCancelOkBody) evt.getMethod(); + evt.getProtocolSession().confirmConsumerCancelled(evt.getChannelId(), body.consumerTag); + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index a4ed89719b..6a40fd3133 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -406,4 +406,12 @@ public class AMQProtocolSession implements ProtocolVersionList HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay)); } } + + public void confirmConsumerCancelled(int channelId, String consumerTag) + { + final Integer chId = channelId; + final AMQSession session = (AMQSession) _channelId2SessionMap.get(chId); + + session.confirmConsumerCancelled(consumerTag); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java index 887850c06e..50bd1667f9 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java @@ -103,6 +103,7 @@ public class AMQStateManager implements AMQMethodListener frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance()); frame2handlerMap.put(BasicDeliverBody.class, BasicDeliverMethodHandler.getInstance()); frame2handlerMap.put(BasicReturnBody.class, BasicReturnMethodHandler.getInstance()); + frame2handlerMap.put(BasicCancelOkBody.class, BasicCancelOkMethodHandler.getInstance()); frame2handlerMap.put(ChannelFlowOkBody.class, ChannelFlowOkMethodHandler.getInstance()); frame2handlerMap.put(QueueDeleteOkBody.class, QueueDeleteOkMethodHandler.getInstance()); frame2handlerMap.put(ExchangeBoundOkBody.class, ExchangeBoundOkMethodHandler.getInstance()); diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java index cc7f6ecd2a..c751e4a011 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java @@ -122,6 +122,21 @@ class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManage //no-op -- if selectors are implemented on RemoteSubscriptions then look at SubscriptionImpl } + public boolean isAutoClose() + { + return false; + } + + public void close() + { + //no-op + } + + public boolean isBrowser() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + public void sendNextMessage(AMQQueue queue) { diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java index 2de22f9084..fea3c93280 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java +++ b/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java @@ -91,6 +91,21 @@ public class SubscriptionTestHelper implements Subscription //no-op } + public boolean isAutoClose() + { + return false; + } + + public void close() + { + //no-op + } + + public boolean isBrowser() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + public int hashCode() { return key.hashCode(); |