diff options
Diffstat (limited to 'java')
4 files changed, 177 insertions, 77 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index ce1db7d26e..c114d2d25f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -41,6 +41,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; /** * This is an AMQ Queue, and should not be confused with a JMS queue or any other abstraction like @@ -157,7 +158,7 @@ public class AMQQueue implements Managable, Comparable /** * total messages received by the queue since startup. */ - public long _totalMessagesReceived = 0; + public AtomicLong _totalMessagesReceived = new AtomicLong(); public int compareTo(Object o) { @@ -291,59 +292,77 @@ public class AMQQueue implements Managable, Comparable } /** - * @see ManagedQueue#moveMessages + * moves messages from this queue to another queue. to do this the approach is following- + * - setup the queue for moving messages (hold the lock and stop the async delivery) + * - get all the messages available in the given message id range + * - setup the other queue for moving messages (hold the lock and stop the async delivery) + * - send these available messages to the other queue (enqueue in other queue) + * - Once sending to other Queue is successful, remove messages from this queue + * - remove locks from both queues and start async delivery * @param fromMessageId * @param toMessageId * @param queueName * @param storeContext - * @throws AMQException */ public synchronized void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, - StoreContext storeContext) throws AMQException + StoreContext storeContext) { + // prepare the delivery manager for moving messages by stopping the async delivery and creating a lock AMQQueue anotherQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName)); - List<AMQMessage> list = getMessagesOnTheQueue(); - List<AMQMessage> foundMessagesList = new ArrayList<AMQMessage>(); - int maxMessageCountToBeMoved = (int)(toMessageId - fromMessageId + 1); - for (AMQMessage message : list) + try { - long msgId = message.getMessageId(); - if (msgId >= fromMessageId && msgId <= toMessageId) - { - foundMessagesList.add(message); - } - // break the loop as soon as messages to be removed are found - if (foundMessagesList.size() == maxMessageCountToBeMoved) + startMovingMessages(); + List<AMQMessage> list = getMessagesOnTheQueue(); + List<AMQMessage> foundMessagesList = new ArrayList<AMQMessage>(); + int maxMessageCountToBeMoved = (int)(toMessageId - fromMessageId + 1); + + // Run this loop till you find all the messages or the list has no more messages + for (AMQMessage message : list) { - break; + long msgId = message.getMessageId(); + if (msgId >= fromMessageId && msgId <= toMessageId) + { + foundMessagesList.add(message); + } + // break the loop as soon as messages to be removed are found + if (foundMessagesList.size() == maxMessageCountToBeMoved) + { + break; + } } - } - // move messages to another queue - for (AMQMessage message : foundMessagesList) + // move messages to another queue + anotherQueue.startMovingMessages(); + anotherQueue.enqueueMovedMessages(storeContext, foundMessagesList); + + // moving is successful, now remove from original queue + _deliveryMgr.removeMovedMessages(foundMessagesList); + } + finally { - try - { - anotherQueue.process(storeContext, message); - } - catch(AMQException ex) - { - foundMessagesList.subList(foundMessagesList.indexOf(message), foundMessagesList.size()).clear(); - // Exception occured, so rollback the changes - anotherQueue.removeMessages(foundMessagesList); - throw ex; - } + // remove the lock and start the async delivery + anotherQueue.stopMovingMessages(); + stopMovingMessages(); } + } - // moving is successful, now remove from original queue - removeMessages(foundMessagesList); + public void startMovingMessages() + { + _deliveryMgr.startMovingMessages(); } - public synchronized void removeMessages(List<AMQMessage> messageList) + private void enqueueMovedMessages(StoreContext storeContext, List<AMQMessage> messageList) { - _deliveryMgr.removeMessages(messageList); + _deliveryMgr.enqueueMovedMessages(storeContext, messageList); + _totalMessagesReceived.addAndGet(messageList.size()); } + public void stopMovingMessages() + { + _deliveryMgr.stopMovingMessages(); + _deliveryMgr.processAsync(_asyncDelivery); + } + /** * @return MBean object associated with this Queue */ @@ -374,7 +393,7 @@ public class AMQQueue implements Managable, Comparable public long getReceivedMessageCount() { - return _totalMessagesReceived; + return _totalMessagesReceived.get(); } public int getMaximumMessageCount() @@ -407,7 +426,7 @@ public class AMQQueue implements Managable, Comparable /** * Removes the AMQMessage from the top of the queue. */ - public void deleteMessageFromTop(StoreContext storeContext) throws AMQException + public synchronized void deleteMessageFromTop(StoreContext storeContext) throws AMQException { _deliveryMgr.removeAMessageFromTop(storeContext); } @@ -415,7 +434,7 @@ public class AMQQueue implements Managable, Comparable /** * removes all the messages from the queue. */ - public long clearQueue(StoreContext storeContext) throws AMQException + public synchronized long clearQueue(StoreContext storeContext) throws AMQException { return _deliveryMgr.clearAllMessages(storeContext); } @@ -633,7 +652,7 @@ public class AMQQueue implements Managable, Comparable protected void updateReceivedMessageCount(AMQMessage msg) throws AMQException { - _totalMessagesReceived++; + _totalMessagesReceived.incrementAndGet(); try { _managedObject.checkForNotification(msg); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index ba0d3b86d2..eb7e87b923 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -395,24 +395,9 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que { throw new OperationsException("\"From MessageId\" should be greater then 0 and less then \"To MessageId\""); } - - try - { - _queue.moveMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, _storeContext); - } - catch(AMQException amqex) - { - throw new JMException("Error moving messages to " + toQueueName + ": " + amqex); - } + _queue.moveMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, _storeContext); } -// -// public ObjectName getObjectName() throws MalformedObjectNameException -// { -// String objNameString = super.getObjectName().toString(); -// -// return new ObjectName(objNameString); -// } /** diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 6c89101043..858dd5ee09 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -75,7 +75,13 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager */ private final AMQQueue _queue; - + /** + * Flag used while moving messages from this queue to another. For moving messages the async delivery + * should also stop. This flat should be set to true to stop async delivery and set to false to enable + * async delivery again. + */ + private AtomicBoolean _movingMessages = new AtomicBoolean(); + /** * Lock used to ensure that an channel that becomes unsuspended during the start of the queueing process is forced * to wait till the first message is added to the queue. This will ensure that the _queue has messages to be delivered @@ -167,9 +173,12 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } - public synchronized List<AMQMessage> getMessages() + public List<AMQMessage> getMessages() { - return new ArrayList<AMQMessage>(_messages); + _lock.lock(); + ArrayList<AMQMessage> list = new ArrayList<AMQMessage>(_messages); + _lock.unlock(); + return list; } public void populatePreDeliveryQueue(Subscription subscription) @@ -242,8 +251,52 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } } - public synchronized void removeMessages(List<AMQMessage> messageList) + /** + * For feature of moving messages, this method is used. It sets the lock and sets the movingMessages flag, + * so that the asyn delivery is also stopped. + */ + public void startMovingMessages() + { + _lock.lock(); + _movingMessages.set(true); + } + + /** + * Once moving messages to another queue is done or aborted, remove lock and unset the movingMessages flag, + * so that the async delivery can start again. + */ + public void stopMovingMessages() + { + _movingMessages.set(false); + if (_lock.isHeldByCurrentThread()) + { + _lock.unlock(); + } + } + + /** + * Messages will be removed from this queue and all preDeliveryQueues + * @param messageList + */ + public void removeMovedMessages(List<AMQMessage> messageList) { + // Remove from the + boolean hasSubscribers = _subscriptions.hasActiveSubscribers(); + if (hasSubscribers) + { + for (Subscription sub : _subscriptions.getSubscriptions()) + { + if (!sub.isSuspended() && sub.hasFilters()) + { + Queue<AMQMessage> preDeliveryQueue = sub.getPreDeliveryQueue(); + for (AMQMessage msg : messageList) + { + preDeliveryQueue.remove(msg); + } + } + } + } + for (AMQMessage msg : messageList) { if (_messages.remove(msg)) @@ -253,29 +306,42 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } } - public synchronized void removeAMessageFromTop(StoreContext storeContext) throws AMQException + /** + * Now with implementation of predelivery queues, this method will mark the message on the top as taken. + * @param storeContext + * @throws AMQException + */ + public void removeAMessageFromTop(StoreContext storeContext) throws AMQException { - AMQMessage msg = poll(); + _lock.lock(); + AMQMessage msg = getNextMessage(); if (msg != null) { - msg.dequeue(storeContext, _queue); - _totalMessageSize.getAndAdd(-msg.getSize()); - } + // mark this message as taken and get it removed + msg.taken(); + _queue.dequeue(storeContext, msg); + getNextMessage(); + } + + _lock.unlock(); } - public synchronized long clearAllMessages(StoreContext storeContext) throws AMQException + public long clearAllMessages(StoreContext storeContext) throws AMQException { long count = 0; - AMQMessage msg = poll(); + _lock.lock(); + + AMQMessage msg = getNextMessage(); while (msg != null) { - msg.dequeue(storeContext, _queue); + //mark this message as taken and get it removed + msg.taken(); + _queue.dequeue(storeContext, msg); + msg = getNextMessage(); count++; - _totalMessageSize.set(0L); - msg = poll(); - } + _lock.unlock(); return count; } @@ -298,6 +364,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { //remove the already taken message messages.poll(); + _totalMessageSize.addAndGet(-message.getSize()); // try the next message message = messages.peek(); } @@ -335,6 +402,34 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } /** + * enqueues the messages in the list on the queue and all required predelivery queues + * @param storeContext + * @param movedMessageList + */ + public void enqueueMovedMessages(StoreContext storeContext, List<AMQMessage> movedMessageList) + { + _lock.lock(); + for (AMQMessage msg : movedMessageList) + { + addMessageToQueue(msg); + } + + // enqueue on the pre delivery queues + for (Subscription sub : _subscriptions.getSubscriptions()) + { + for (AMQMessage msg : movedMessageList) + { + // Only give the message to those that want them. + if (sub.hasInterest(msg)) + { + sub.enqueueForPreDelivery(msg); + } + } + } + _lock.unlock(); + } + + /** * Only one thread should ever execute this method concurrently, but * it can do so while other threads invoke deliver(). */ @@ -343,7 +438,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager // Continue to process delivery while we haveSubscribers and messages boolean hasSubscribers = _subscriptions.hasActiveSubscribers(); - while (hasSubscribers && hasQueuedMessages()) + while (hasSubscribers && hasQueuedMessages() && !_movingMessages.get()) { hasSubscribers = false; @@ -378,11 +473,6 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } } - private AMQMessage poll() - { - return _messages.poll(); - } - public void deliver(StoreContext context, AMQShortString name, AMQMessage msg) throws AMQException { if (_log.isDebugEnabled()) @@ -482,7 +572,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager public void run() { boolean running = true; - while (running) + while (running && !_movingMessages.get()) { processQueue(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java index c6f00bd189..0129c3b895 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java @@ -76,7 +76,13 @@ interface DeliveryManager long clearAllMessages(StoreContext storeContext) throws AMQException; - void removeMessages(List<AMQMessage> messageListToRemove); + void startMovingMessages(); + + void enqueueMovedMessages(StoreContext context, List<AMQMessage> messageList); + + void stopMovingMessages(); + + void removeMovedMessages(List<AMQMessage> messageListToRemove); List<AMQMessage> getMessages(); |