diff options
author | Bhupendra Bhusman Bhardwaj <bhupendrab@apache.org> | 2007-02-07 11:27:15 +0000 |
---|---|---|
committer | Bhupendra Bhusman Bhardwaj <bhupendrab@apache.org> | 2007-02-07 11:27:15 +0000 |
commit | b37d5db6ef5e5e9363a1e791dcf9a419bc348bb1 (patch) | |
tree | 83d4b469a2fe4b7876d2bc787857e0984bf0f94e /java/broker | |
parent | 020a924c742b185ef5360108985460f0155eb671 (diff) | |
download | qpid-python-b37d5db6ef5e5e9363a1e791dcf9a419bc348bb1.tar.gz |
QPID-170
Management feature added - moving messages from one Queue to another
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@504507 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker')
5 files changed, 108 insertions, 3 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index aa372a3b99..ce1db7d26e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -36,6 +36,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost; import javax.management.JMException; import java.text.MessageFormat; import java.util.List; +import java.util.ArrayList; import java.util.concurrent.Executor; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; @@ -101,11 +102,8 @@ public class AMQQueue implements Managable, Comparable private final AtomicBoolean _deleted = new AtomicBoolean(false); - - private List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>(); - /** * Manages message delivery. */ @@ -293,6 +291,60 @@ public class AMQQueue implements Managable, Comparable } /** + * @see ManagedQueue#moveMessages + * @param fromMessageId + * @param toMessageId + * @param queueName + * @param storeContext + * @throws AMQException + */ + public synchronized void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, + StoreContext storeContext) throws AMQException + { + AMQQueue anotherQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName)); + List<AMQMessage> list = getMessagesOnTheQueue(); + List<AMQMessage> foundMessagesList = new ArrayList<AMQMessage>(); + int maxMessageCountToBeMoved = (int)(toMessageId - fromMessageId + 1); + for (AMQMessage message : list) + { + long msgId = message.getMessageId(); + if (msgId >= fromMessageId && msgId <= toMessageId) + { + foundMessagesList.add(message); + } + // break the loop as soon as messages to be removed are found + if (foundMessagesList.size() == maxMessageCountToBeMoved) + { + break; + } + } + + // move messages to another queue + for (AMQMessage message : foundMessagesList) + { + try + { + anotherQueue.process(storeContext, message); + } + catch(AMQException ex) + { + foundMessagesList.subList(foundMessagesList.indexOf(message), foundMessagesList.size()).clear(); + // Exception occured, so rollback the changes + anotherQueue.removeMessages(foundMessagesList); + throw ex; + } + } + + // moving is successful, now remove from original queue + removeMessages(foundMessagesList); + } + + public synchronized void removeMessages(List<AMQMessage> messageList) + { + _deliveryMgr.removeMessages(messageList); + } + + /** * @return MBean object associated with this Queue */ public ManagedObject getManagedObject() diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index a263350cb0..ba0d3b86d2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -381,6 +381,31 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que return _messageList; } + + /** + * @see ManagedQueue#moveMessages + * @param fromMessageId + * @param toMessageId + * @param toQueueName + * @throws JMException + */ + public void moveMessages(long fromMessageId, long toMessageId, String toQueueName) throws JMException + { + if (fromMessageId > toMessageId || (fromMessageId < 1)) + { + throw new OperationsException("\"From MessageId\" should be greater then 0 and less then \"To MessageId\""); + } + + try + { + _queue.moveMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, _storeContext); + } + catch(AMQException amqex) + { + throw new JMException("Error moving messages to " + toQueueName + ": " + amqex); + } + + } // // public ObjectName getObjectName() throws MalformedObjectNameException // { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 3a9ce64c57..6c89101043 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -242,6 +242,16 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } } + public synchronized void removeMessages(List<AMQMessage> messageList) + { + for (AMQMessage msg : messageList) + { + if (_messages.remove(msg)) + { + _totalMessageSize.getAndAdd(-msg.getSize()); + } + } + } public synchronized void removeAMessageFromTop(StoreContext storeContext) throws AMQException { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java index f7820e1465..c6f00bd189 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java @@ -76,6 +76,8 @@ interface DeliveryManager long clearAllMessages(StoreContext storeContext) throws AMQException; + void removeMessages(List<AMQMessage> messageListToRemove); + List<AMQMessage> getMessages(); void populatePreDeliveryQueue(Subscription subscription); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java index 81580d8db5..67bc830cf6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java @@ -225,4 +225,20 @@ public interface ManagedQueue impact= MBeanOperationInfo.ACTION) void clearQueue() throws IOException, JMException; + /** + * Moves the messages in given range of message Ids to given Queue. QPID-170 + * @param fromMessageId first in the range of message ids + * @param toMessageId last in the range of message ids + * @param toQueue where the messages are to be moved + * @throws IOException + * @throws JMException + * @throws AMQException + */ + @MBeanOperation(name="moveMessages", + description="You can move messages to another queue from this queue ", + impact= MBeanOperationInfo.ACTION) + void moveMessages(@MBeanOperationParameter(name="from MessageId", description="from MessageId")long fromMessageId, + @MBeanOperationParameter(name="to MessageId", description="to MessageId")long toMessageId, + @MBeanOperationParameter(name= ManagedQueue.TYPE, description="to Queue Name")String toQueue) + throws IOException, JMException, AMQException; } |