diff options
author | Martin Ritchie <ritchiem@apache.org> | 2007-07-17 09:56:17 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2007-07-17 09:56:17 +0000 |
commit | ef526606615726df58dd92ae095b1779ad4dcada (patch) | |
tree | 5b166d44adc7622951ba1a67388da404891bbc18 | |
parent | 0ea4d0f565810527578150853f7487959e079d1f (diff) | |
download | qpid-python-ef526606615726df58dd92ae095b1779ad4dcada.tar.gz |
QPID-540 Prevent NPE when purging message from the main _message queue in the ConcurrentSelectorDeliveryManager that have been delivered via a Subscribers _messageQueue. Ensuring that any expired messages are still correctly handled. i.e. the Queue size/depth is reduced and the message correctly dequeued from the underlying store.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@556869 13f79535-47bb-0310-9956-ffa450edef68
3 files changed, 135 insertions, 56 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index ad5e2d276e..695c2611cb 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -21,7 +21,6 @@ package org.apache.qpid.server.queue; import org.apache.log4j.Logger; - import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQBody; import org.apache.qpid.framing.AMQDataBlock; @@ -98,9 +97,9 @@ public class AMQMessage public void setExpiration() { long expiration = - ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getExpiration(); + ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getExpiration(); long timestamp = - ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getTimestamp(); + ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getTimestamp(); if (ApplicationRegistry.getInstance().getConfiguration().getBoolean("advanced.synced-clocks", false)) { @@ -163,8 +162,8 @@ public class AMQMessage { AMQBody cb = - getProtocolVersionMethodConverter().convertToBody(_messageHandle.getContentChunk(getStoreContext(), - _messageId, ++_index)); + getProtocolVersionMethodConverter().convertToBody(_messageHandle.getContentChunk(getStoreContext(), + _messageId, ++_index)); return new AMQFrame(_channel, cb); } @@ -250,7 +249,7 @@ public class AMQMessage * @throws AMQException */ public AMQMessage(Long messageId, MessageStore store, MessageHandleFactory factory, TransactionalContext txnConext) - throws AMQException + throws AMQException { _messageId = messageId; _messageHandle = factory.createMessageHandle(messageId, store, true); @@ -267,7 +266,7 @@ public class AMQMessage * @param contentHeader */ public AMQMessage(Long messageId, MessagePublishInfo info, TransactionalContext txnContext, - ContentHeaderBody contentHeader) throws AMQException + ContentHeaderBody contentHeader) throws AMQException { this(messageId, info, txnContext); setContentHeaderBody(contentHeader); @@ -286,8 +285,8 @@ public class AMQMessage * @throws AMQException */ public AMQMessage(Long messageId, MessagePublishInfo info, TransactionalContext txnContext, - ContentHeaderBody contentHeader, List<AMQQueue> destinationQueues, List<ContentChunk> contentBodies, - MessageStore messageStore, StoreContext storeContext, MessageHandleFactory messageHandleFactory) throws AMQException + ContentHeaderBody contentHeader, List<AMQQueue> destinationQueues, List<ContentChunk> contentBodies, + MessageStore messageStore, StoreContext storeContext, MessageHandleFactory messageHandleFactory) throws AMQException { this(messageId, info, txnContext, contentHeader); _transientMessageData.setDestinationQueues(destinationQueues); @@ -335,7 +334,7 @@ public class AMQMessage } public void routingComplete(MessageStore store, StoreContext storeContext, MessageHandleFactory factory) - throws AMQException + throws AMQException { final boolean persistent = isPersistent(); _messageHandle = factory.createMessageHandle(_messageId, store, persistent); @@ -451,7 +450,7 @@ public class AMQMessage if (count < 0) { throw new MessageCleanupException("Reference count for message id " + debugIdentity() - + " has gone below 0."); + + " has gone below 0."); } } } @@ -668,12 +667,7 @@ public class AMQMessage { long now = System.currentTimeMillis(); - if (now > _expiration) - { - dequeue(storecontext, queue); - - return true; - } + return (now > _expiration); } return false; @@ -700,7 +694,7 @@ public class AMQMessage // first we allow the handle to know that the message has been fully received. This is useful if it is // maintaining any calculated values based on content chunks _messageHandle.setPublishAndContentHeaderBody(storeContext, _messageId, - _transientMessageData.getMessagePublishInfo(), _transientMessageData.getContentHeaderBody()); + _transientMessageData.getMessagePublishInfo(), _transientMessageData.getContentHeaderBody()); // we then allow the transactional context to do something with the message content // now that it has all been received, before we attempt delivery @@ -936,7 +930,7 @@ public class AMQMessage // _taken + " by :" + _takenBySubcription; return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: " - + _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString(); + + _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString(); } public Subscription getDeliveredSubscription(AMQQueue queue) diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 0b7b5e93d9..26a73fcc8a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -1,18 +1,7 @@ /* Copyright Rupert Smith, 2005 to 2007, all rights reserved. */ package org.apache.qpid.server.queue; -import java.text.MessageFormat; -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - -import javax.management.JMException; - import org.apache.log4j.Logger; - import org.apache.qpid.AMQException; import org.apache.qpid.configuration.Configured; import org.apache.qpid.framing.AMQShortString; @@ -26,6 +15,15 @@ import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.virtualhost.VirtualHost; +import javax.management.JMException; +import java.text.MessageFormat; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + /** * This is an AMQ Queue, and should not be confused with a JMS queue or any other abstraction like that. It is described * fully in RFC 006. @@ -138,11 +136,11 @@ public class AMQQueue implements Managable, Comparable public int compareTo(Object o) { - return _name.compareTo(((AMQQueue)o).getName()); + return _name.compareTo(((AMQQueue) o).getName()); } public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost) - throws AMQException + throws AMQException { this(name, durable, owner, autoDelete, virtualHost, AsyncDeliveryConfig.getAsyncDeliveryExecutor(), new SubscriptionSet(), new SubscriptionImpl.Factory()); @@ -422,6 +420,70 @@ public class AMQQueue implements Managable, Comparable } } + /** + * Removes messages from this queue, and also commits the remove on the message store. Delivery activity + * on the queues being moved between is suspended during the remove. + * + * @param fromMessageId The first message id to move. + * @param toMessageId The last message id to move. + * @param storeContext The context of the message store under which to perform the move. This is associated with + * the stores transactional context. + */ + public synchronized void removeMessagesFromQueue(long fromMessageId, long toMessageId, StoreContext storeContext) + { + MessageStore fromStore = getVirtualHost().getMessageStore(); + + try + { + // Obtain locks to prevent activity on the queues being moved between. + startMovingMessages(); + + // Get the list of messages to move. + List<AMQMessage> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId); + + try + { + fromStore.beginTran(storeContext); + + // remove the messages in on the message store. + for (AMQMessage message : foundMessagesList) + { + fromStore.dequeueMessage(storeContext, _name, message.getMessageId()); + } + + // Commit and flush the move transcations. + try + { + fromStore.commitTran(storeContext); + } + catch (AMQException e) + { + throw new RuntimeException("Failed to commit transaction whilst moving messages on message store.", e); + } + + // remove the messages on the in-memory queues. + _deliveryMgr.removeMovedMessages(foundMessagesList); + } + // Abort the move transactions on move failures. + catch (AMQException e) + { + try + { + fromStore.abortTran(storeContext); + } + catch (AMQException ae) + { + throw new RuntimeException("Failed to abort transaction whilst moving messages on message store.", ae); + } + } + } + // Release locks to allow activity on the queues being moved between to continue. + finally + { + stopMovingMessages(); + } + } + public void startMovingMessages() { _deliveryMgr.startMovingMessages(); @@ -560,7 +622,7 @@ public class AMQQueue implements Managable, Comparable } Subscription subscription = - _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal, this); + _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal, this); if (subscription.filtersMessages()) { @@ -598,14 +660,14 @@ public class AMQQueue implements Managable, Comparable if (_logger.isDebugEnabled()) { _logger.debug(MessageFormat.format( - "Unregistering protocol session {0} with channel {1} and consumer tag {2} from {3}", - ps, channel, consumerTag, this)); + "Unregistering protocol session {0} with channel {1} and consumer tag {2} from {3}", + ps, channel, consumerTag, this)); } Subscription removedSubscription; if ((removedSubscription = _subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel, ps, consumerTag))) - == null) + == null) { throw new AMQException("Protocol session with channel " + channel + " and consumer tag " + consumerTag + " and protocol session key " + ps.getKey() + " not registered with queue " + this); @@ -787,7 +849,7 @@ public class AMQQueue implements Managable, Comparable return false; } - final AMQQueue amqQueue = (AMQQueue)o; + final AMQQueue amqQueue = (AMQQueue) o; return (_name.equals(amqQueue._name)); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 2aa759b35d..1cdee20598 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -20,19 +20,6 @@ */ package org.apache.qpid.server.queue; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Queue; -import java.util.Set; -import java.util.Collections; -import java.util.HashSet; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.ReentrantLock; - import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.configuration.Configured; @@ -42,8 +29,21 @@ import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.configuration.Configurator; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.store.StoreContext; -import org.apache.qpid.util.MessageQueue; import org.apache.qpid.util.ConcurrentLinkedMessageQueueAtomicSize; +import org.apache.qpid.util.MessageQueue; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; /** Manages delivery of messages on behalf of a queue */ @@ -453,12 +453,29 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager //while (we have a message) && ((The subscriber is not a browser or message is taken ) or we are clearing) && (Check message is taken.) while (purgeMessage(message, sub)) { + // if we are purging then ensure we mark this message taken for the current subscriber + // the current subscriber may be null in the case of a get or a purge but this is ok. +// boolean alreadyTaken = message.taken(_queue, sub); + //remove the already taken message or expired AMQMessage removed = messages.poll(); assert removed == message; - _totalMessageSize.addAndGet(-message.getSize()); + // if the message expired then the _totalMessageSize needs adjusting + if (message.expired(sub.getChannel().getStoreContext(), _queue)) + { + _totalMessageSize.addAndGet(-message.getSize()); + + message.dequeue(sub.getChannel().getStoreContext(), _queue); + + if (_log.isInfoEnabled()) + { + _log.info(debugIdentity() + " Doing clean up of the main _message queue."); + } + } + //else the clean up is not required as the message has already been taken for this queue therefore + // it was the responsibility of the code that took the message to ensure the _totalMessageSize was updated. if (_log.isTraceEnabled()) { @@ -473,7 +490,10 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } /** - * + * This method will return true if the message is to be purged from the queue. + * + * + * SIDE-EFFECT: The message will be taken by the Subscription(sub) for the current Queue(_queue) * @param message * @param sub * @return @@ -606,7 +626,10 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { if (_log.isInfoEnabled()) { - _log.info(debugIdentity() + "We could do clean up of the main _message queue here"); + //fixme - we should do the clean up as the message remains on the _message queue + // this is resulting in the next consumer receiving the message and then attempting to purge it + // + _log.info(debugIdentity() + "We should do clean up of the main _message queue here"); } } @@ -800,7 +823,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager if (debugEnabled) { _log.debug(debugIdentity() + " Subscription(" + System.identityHashCode(s) + ") became " + - "suspended between nextSubscriber and send for message:" + msg.debugIdentity()); + "suspended between nextSubscriber and send for message:" + msg.debugIdentity()); } } } @@ -810,7 +833,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager if (debugEnabled) { _log.debug(debugIdentity() + " Message(" + msg.debugIdentity() + ") has not been taken so recursing!:" + - " Subscriber:" + System.identityHashCode(s)); + " Subscriber:" + System.identityHashCode(s)); } deliver(context, name, msg, deliverFirst); |