summaryrefslogtreecommitdiff
path: root/java/broker
diff options
context:
space:
mode:
authorBhupendra Bhusman Bhardwaj <bhupendrab@apache.org>2007-02-07 11:27:15 +0000
committerBhupendra Bhusman Bhardwaj <bhupendrab@apache.org>2007-02-07 11:27:15 +0000
commitb37d5db6ef5e5e9363a1e791dcf9a419bc348bb1 (patch)
tree83d4b469a2fe4b7876d2bc787857e0984bf0f94e /java/broker
parent020a924c742b185ef5360108985460f0155eb671 (diff)
downloadqpid-python-b37d5db6ef5e5e9363a1e791dcf9a419bc348bb1.tar.gz
QPID-170
Management feature added - moving messages from one Queue to another git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@504507 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java58
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java25
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java10
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java16
5 files changed, 108 insertions, 3 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index aa372a3b99..ce1db7d26e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -36,6 +36,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
import javax.management.JMException;
import java.text.MessageFormat;
import java.util.List;
+import java.util.ArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
@@ -101,11 +102,8 @@ public class AMQQueue implements Managable, Comparable
private final AtomicBoolean _deleted = new AtomicBoolean(false);
-
-
private List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>();
-
/**
* Manages message delivery.
*/
@@ -293,6 +291,60 @@ public class AMQQueue implements Managable, Comparable
}
/**
+ * @see ManagedQueue#moveMessages
+ * @param fromMessageId
+ * @param toMessageId
+ * @param queueName
+ * @param storeContext
+ * @throws AMQException
+ */
+ public synchronized void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName,
+ StoreContext storeContext) throws AMQException
+ {
+ AMQQueue anotherQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
+ List<AMQMessage> list = getMessagesOnTheQueue();
+ List<AMQMessage> foundMessagesList = new ArrayList<AMQMessage>();
+ int maxMessageCountToBeMoved = (int)(toMessageId - fromMessageId + 1);
+ for (AMQMessage message : list)
+ {
+ long msgId = message.getMessageId();
+ if (msgId >= fromMessageId && msgId <= toMessageId)
+ {
+ foundMessagesList.add(message);
+ }
+ // break the loop as soon as messages to be removed are found
+ if (foundMessagesList.size() == maxMessageCountToBeMoved)
+ {
+ break;
+ }
+ }
+
+ // move messages to another queue
+ for (AMQMessage message : foundMessagesList)
+ {
+ try
+ {
+ anotherQueue.process(storeContext, message);
+ }
+ catch(AMQException ex)
+ {
+ foundMessagesList.subList(foundMessagesList.indexOf(message), foundMessagesList.size()).clear();
+ // Exception occured, so rollback the changes
+ anotherQueue.removeMessages(foundMessagesList);
+ throw ex;
+ }
+ }
+
+ // moving is successful, now remove from original queue
+ removeMessages(foundMessagesList);
+ }
+
+ public synchronized void removeMessages(List<AMQMessage> messageList)
+ {
+ _deliveryMgr.removeMessages(messageList);
+ }
+
+ /**
* @return MBean object associated with this Queue
*/
public ManagedObject getManagedObject()
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
index a263350cb0..ba0d3b86d2 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
@@ -381,6 +381,31 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
return _messageList;
}
+
+ /**
+ * @see ManagedQueue#moveMessages
+ * @param fromMessageId
+ * @param toMessageId
+ * @param toQueueName
+ * @throws JMException
+ */
+ public void moveMessages(long fromMessageId, long toMessageId, String toQueueName) throws JMException
+ {
+ if (fromMessageId > toMessageId || (fromMessageId < 1))
+ {
+ throw new OperationsException("\"From MessageId\" should be greater then 0 and less then \"To MessageId\"");
+ }
+
+ try
+ {
+ _queue.moveMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, _storeContext);
+ }
+ catch(AMQException amqex)
+ {
+ throw new JMException("Error moving messages to " + toQueueName + ": " + amqex);
+ }
+
+ }
//
// public ObjectName getObjectName() throws MalformedObjectNameException
// {
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index 3a9ce64c57..6c89101043 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -242,6 +242,16 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
}
+ public synchronized void removeMessages(List<AMQMessage> messageList)
+ {
+ for (AMQMessage msg : messageList)
+ {
+ if (_messages.remove(msg))
+ {
+ _totalMessageSize.getAndAdd(-msg.getSize());
+ }
+ }
+ }
public synchronized void removeAMessageFromTop(StoreContext storeContext) throws AMQException
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
index f7820e1465..c6f00bd189 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
@@ -76,6 +76,8 @@ interface DeliveryManager
long clearAllMessages(StoreContext storeContext) throws AMQException;
+ void removeMessages(List<AMQMessage> messageListToRemove);
+
List<AMQMessage> getMessages();
void populatePreDeliveryQueue(Subscription subscription);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java
index 81580d8db5..67bc830cf6 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java
@@ -225,4 +225,20 @@ public interface ManagedQueue
impact= MBeanOperationInfo.ACTION)
void clearQueue() throws IOException, JMException;
+ /**
+ * Moves the messages in given range of message Ids to given Queue. QPID-170
+ * @param fromMessageId first in the range of message ids
+ * @param toMessageId last in the range of message ids
+ * @param toQueue where the messages are to be moved
+ * @throws IOException
+ * @throws JMException
+ * @throws AMQException
+ */
+ @MBeanOperation(name="moveMessages",
+ description="You can move messages to another queue from this queue ",
+ impact= MBeanOperationInfo.ACTION)
+ void moveMessages(@MBeanOperationParameter(name="from MessageId", description="from MessageId")long fromMessageId,
+ @MBeanOperationParameter(name="to MessageId", description="to MessageId")long toMessageId,
+ @MBeanOperationParameter(name= ManagedQueue.TYPE, description="to Queue Name")String toQueue)
+ throws IOException, JMException, AMQException;
}