diff options
author | Bhupendra Bhusman Bhardwaj <bhupendrab@apache.org> | 2007-02-09 12:29:14 +0000 |
---|---|---|
committer | Bhupendra Bhusman Bhardwaj <bhupendrab@apache.org> | 2007-02-09 12:29:14 +0000 |
commit | a2f74529cf6680a7e8ce21bc4fe21f0e80d4225b (patch) | |
tree | bc09fd08fd63eef42e868b0522110c1a7d1434ed | |
parent | 6b335bd2d33d898856f36c147c5f7fdabddd6ec3 (diff) | |
download | qpid-python-a2f74529cf6680a7e8ce21bc4fe21f0e80d4225b.tar.gz |
QPID-170
predelivery queues will also be cleared with moved messages. Messages will be moved to another queue and predelivery queues of subsribers of another queue will also be populated.
the features - removeMmessageFromTop and clearQueue is also modified by using the getNextMessage
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@505268 13f79535-47bb-0310-9956-ffa450edef68
4 files changed, 177 insertions, 77 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index ce1db7d26e..c114d2d25f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -41,6 +41,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; /** * This is an AMQ Queue, and should not be confused with a JMS queue or any other abstraction like @@ -157,7 +158,7 @@ public class AMQQueue implements Managable, Comparable /** * total messages received by the queue since startup. */ - public long _totalMessagesReceived = 0; + public AtomicLong _totalMessagesReceived = new AtomicLong(); public int compareTo(Object o) { @@ -291,59 +292,77 @@ public class AMQQueue implements Managable, Comparable } /** - * @see ManagedQueue#moveMessages + * moves messages from this queue to another queue. to do this the approach is following- + * - setup the queue for moving messages (hold the lock and stop the async delivery) + * - get all the messages available in the given message id range + * - setup the other queue for moving messages (hold the lock and stop the async delivery) + * - send these available messages to the other queue (enqueue in other queue) + * - Once sending to other Queue is successful, remove messages from this queue + * - remove locks from both queues and start async delivery * @param fromMessageId * @param toMessageId * @param queueName * @param storeContext - * @throws AMQException */ public synchronized void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, - StoreContext storeContext) throws AMQException + StoreContext storeContext) { + // prepare the delivery manager for moving messages by stopping the async delivery and creating a lock AMQQueue anotherQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName)); - List<AMQMessage> list = getMessagesOnTheQueue(); - List<AMQMessage> foundMessagesList = new ArrayList<AMQMessage>(); - int maxMessageCountToBeMoved = (int)(toMessageId - fromMessageId + 1); - for (AMQMessage message : list) + try { - long msgId = message.getMessageId(); - if (msgId >= fromMessageId && msgId <= toMessageId) - { - foundMessagesList.add(message); - } - // break the loop as soon as messages to be removed are found - if (foundMessagesList.size() == maxMessageCountToBeMoved) + startMovingMessages(); + List<AMQMessage> list = getMessagesOnTheQueue(); + List<AMQMessage> foundMessagesList = new ArrayList<AMQMessage>(); + int maxMessageCountToBeMoved = (int)(toMessageId - fromMessageId + 1); + + // Run this loop till you find all the messages or the list has no more messages + for (AMQMessage message : list) { - break; + long msgId = message.getMessageId(); + if (msgId >= fromMessageId && msgId <= toMessageId) + { + foundMessagesList.add(message); + } + // break the loop as soon as messages to be removed are found + if (foundMessagesList.size() == maxMessageCountToBeMoved) + { + break; + } } - } - // move messages to another queue - for (AMQMessage message : foundMessagesList) + // move messages to another queue + anotherQueue.startMovingMessages(); + anotherQueue.enqueueMovedMessages(storeContext, foundMessagesList); + + // moving is successful, now remove from original queue + _deliveryMgr.removeMovedMessages(foundMessagesList); + } + finally { - try - { - anotherQueue.process(storeContext, message); - } - catch(AMQException ex) - { - foundMessagesList.subList(foundMessagesList.indexOf(message), foundMessagesList.size()).clear(); - // Exception occured, so rollback the changes - anotherQueue.removeMessages(foundMessagesList); - throw ex; - } + // remove the lock and start the async delivery + anotherQueue.stopMovingMessages(); + stopMovingMessages(); } + } - // moving is successful, now remove from original queue - removeMessages(foundMessagesList); + public void startMovingMessages() + { + _deliveryMgr.startMovingMessages(); } - public synchronized void removeMessages(List<AMQMessage> messageList) + private void enqueueMovedMessages(StoreContext storeContext, List<AMQMessage> messageList) { - _deliveryMgr.removeMessages(messageList); + _deliveryMgr.enqueueMovedMessages(storeContext, messageList); + _totalMessagesReceived.addAndGet(messageList.size()); } + public void stopMovingMessages() + { + _deliveryMgr.stopMovingMessages(); + _deliveryMgr.processAsync(_asyncDelivery); + } + /** * @return MBean object associated with this Queue */ @@ -374,7 +393,7 @@ public class AMQQueue implements Managable, Comparable public long getReceivedMessageCount() { - return _totalMessagesReceived; + return _totalMessagesReceived.get(); } public int getMaximumMessageCount() @@ -407,7 +426,7 @@ public class AMQQueue implements Managable, Comparable /** * Removes the AMQMessage from the top of the queue. */ - public void deleteMessageFromTop(StoreContext storeContext) throws AMQException + public synchronized void deleteMessageFromTop(StoreContext storeContext) throws AMQException { _deliveryMgr.removeAMessageFromTop(storeContext); } @@ -415,7 +434,7 @@ public class AMQQueue implements Managable, Comparable /** * removes all the messages from the queue. */ - public long clearQueue(StoreContext storeContext) throws AMQException + public synchronized long clearQueue(StoreContext storeContext) throws AMQException { return _deliveryMgr.clearAllMessages(storeContext); } @@ -633,7 +652,7 @@ public class AMQQueue implements Managable, Comparable protected void updateReceivedMessageCount(AMQMessage msg) throws AMQException { - _totalMessagesReceived++; + _totalMessagesReceived.incrementAndGet(); try { _managedObject.checkForNotification(msg); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index ba0d3b86d2..eb7e87b923 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -395,24 +395,9 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que { throw new OperationsException("\"From MessageId\" should be greater then 0 and less then \"To MessageId\""); } - - try - { - _queue.moveMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, _storeContext); - } - catch(AMQException amqex) - { - throw new JMException("Error moving messages to " + toQueueName + ": " + amqex); - } + _queue.moveMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, _storeContext); } -// -// public ObjectName getObjectName() throws MalformedObjectNameException -// { -// String objNameString = super.getObjectName().toString(); -// -// return new ObjectName(objNameString); -// } /** diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 6c89101043..858dd5ee09 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -75,7 +75,13 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager */ private final AMQQueue _queue; - + /** + * Flag used while moving messages from this queue to another. For moving messages the async delivery + * should also stop. This flat should be set to true to stop async delivery and set to false to enable + * async delivery again. + */ + private AtomicBoolean _movingMessages = new AtomicBoolean(); + /** * Lock used to ensure that an channel that becomes unsuspended during the start of the queueing process is forced * to wait till the first message is added to the queue. This will ensure that the _queue has messages to be delivered @@ -167,9 +173,12 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } - public synchronized List<AMQMessage> getMessages() + public List<AMQMessage> getMessages() { - return new ArrayList<AMQMessage>(_messages); + _lock.lock(); + ArrayList<AMQMessage> list = new ArrayList<AMQMessage>(_messages); + _lock.unlock(); + return list; } public void populatePreDeliveryQueue(Subscription subscription) @@ -242,8 +251,52 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } } - public synchronized void removeMessages(List<AMQMessage> messageList) + /** + * For feature of moving messages, this method is used. It sets the lock and sets the movingMessages flag, + * so that the asyn delivery is also stopped. + */ + public void startMovingMessages() + { + _lock.lock(); + _movingMessages.set(true); + } + + /** + * Once moving messages to another queue is done or aborted, remove lock and unset the movingMessages flag, + * so that the async delivery can start again. + */ + public void stopMovingMessages() + { + _movingMessages.set(false); + if (_lock.isHeldByCurrentThread()) + { + _lock.unlock(); + } + } + + /** + * Messages will be removed from this queue and all preDeliveryQueues + * @param messageList + */ + public void removeMovedMessages(List<AMQMessage> messageList) { + // Remove from the + boolean hasSubscribers = _subscriptions.hasActiveSubscribers(); + if (hasSubscribers) + { + for (Subscription sub : _subscriptions.getSubscriptions()) + { + if (!sub.isSuspended() && sub.hasFilters()) + { + Queue<AMQMessage> preDeliveryQueue = sub.getPreDeliveryQueue(); + for (AMQMessage msg : messageList) + { + preDeliveryQueue.remove(msg); + } + } + } + } + for (AMQMessage msg : messageList) { if (_messages.remove(msg)) @@ -253,29 +306,42 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } } - public synchronized void removeAMessageFromTop(StoreContext storeContext) throws AMQException + /** + * Now with implementation of predelivery queues, this method will mark the message on the top as taken. + * @param storeContext + * @throws AMQException + */ + public void removeAMessageFromTop(StoreContext storeContext) throws AMQException { - AMQMessage msg = poll(); + _lock.lock(); + AMQMessage msg = getNextMessage(); if (msg != null) { - msg.dequeue(storeContext, _queue); - _totalMessageSize.getAndAdd(-msg.getSize()); - } + // mark this message as taken and get it removed + msg.taken(); + _queue.dequeue(storeContext, msg); + getNextMessage(); + } + + _lock.unlock(); } - public synchronized long clearAllMessages(StoreContext storeContext) throws AMQException + public long clearAllMessages(StoreContext storeContext) throws AMQException { long count = 0; - AMQMessage msg = poll(); + _lock.lock(); + + AMQMessage msg = getNextMessage(); while (msg != null) { - msg.dequeue(storeContext, _queue); + //mark this message as taken and get it removed + msg.taken(); + _queue.dequeue(storeContext, msg); + msg = getNextMessage(); count++; - _totalMessageSize.set(0L); - msg = poll(); - } + _lock.unlock(); return count; } @@ -298,6 +364,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { //remove the already taken message messages.poll(); + _totalMessageSize.addAndGet(-message.getSize()); // try the next message message = messages.peek(); } @@ -335,6 +402,34 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } /** + * enqueues the messages in the list on the queue and all required predelivery queues + * @param storeContext + * @param movedMessageList + */ + public void enqueueMovedMessages(StoreContext storeContext, List<AMQMessage> movedMessageList) + { + _lock.lock(); + for (AMQMessage msg : movedMessageList) + { + addMessageToQueue(msg); + } + + // enqueue on the pre delivery queues + for (Subscription sub : _subscriptions.getSubscriptions()) + { + for (AMQMessage msg : movedMessageList) + { + // Only give the message to those that want them. + if (sub.hasInterest(msg)) + { + sub.enqueueForPreDelivery(msg); + } + } + } + _lock.unlock(); + } + + /** * Only one thread should ever execute this method concurrently, but * it can do so while other threads invoke deliver(). */ @@ -343,7 +438,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager // Continue to process delivery while we haveSubscribers and messages boolean hasSubscribers = _subscriptions.hasActiveSubscribers(); - while (hasSubscribers && hasQueuedMessages()) + while (hasSubscribers && hasQueuedMessages() && !_movingMessages.get()) { hasSubscribers = false; @@ -378,11 +473,6 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } } - private AMQMessage poll() - { - return _messages.poll(); - } - public void deliver(StoreContext context, AMQShortString name, AMQMessage msg) throws AMQException { if (_log.isDebugEnabled()) @@ -482,7 +572,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager public void run() { boolean running = true; - while (running) + while (running && !_movingMessages.get()) { processQueue(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java index c6f00bd189..0129c3b895 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java @@ -76,7 +76,13 @@ interface DeliveryManager long clearAllMessages(StoreContext storeContext) throws AMQException; - void removeMessages(List<AMQMessage> messageListToRemove); + void startMovingMessages(); + + void enqueueMovedMessages(StoreContext context, List<AMQMessage> messageList); + + void stopMovingMessages(); + + void removeMovedMessages(List<AMQMessage> messageListToRemove); List<AMQMessage> getMessages(); |