summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorBhupendra Bhusman Bhardwaj <bhupendrab@apache.org>2007-02-07 11:27:15 +0000
committerBhupendra Bhusman Bhardwaj <bhupendrab@apache.org>2007-02-07 11:27:15 +0000
commitb37d5db6ef5e5e9363a1e791dcf9a419bc348bb1 (patch)
tree83d4b469a2fe4b7876d2bc787857e0984bf0f94e /java
parent020a924c742b185ef5360108985460f0155eb671 (diff)
downloadqpid-python-b37d5db6ef5e5e9363a1e791dcf9a419bc348bb1.tar.gz
QPID-170
Management feature added - moving messages from one Queue to another git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@504507 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java58
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java25
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java10
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java16
-rw-r--r--java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedQueueAtomicSize.java12
-rw-r--r--java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/Constants.java1
-rw-r--r--java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ServerRegistry.java2
-rw-r--r--java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/JMXServerRegistry.java9
-rw-r--r--java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/MBeanUtility.java4
-rw-r--r--java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/model/OperationDataModel.java5
-rw-r--r--java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/model/ParameterData.java13
-rw-r--r--java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/MBeanView.java10
-rw-r--r--java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/OperationTabControl.java47
14 files changed, 179 insertions, 35 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index aa372a3b99..ce1db7d26e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -36,6 +36,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
import javax.management.JMException;
import java.text.MessageFormat;
import java.util.List;
+import java.util.ArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
@@ -101,11 +102,8 @@ public class AMQQueue implements Managable, Comparable
private final AtomicBoolean _deleted = new AtomicBoolean(false);
-
-
private List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>();
-
/**
* Manages message delivery.
*/
@@ -293,6 +291,60 @@ public class AMQQueue implements Managable, Comparable
}
/**
+ * @see ManagedQueue#moveMessages
+ * @param fromMessageId
+ * @param toMessageId
+ * @param queueName
+ * @param storeContext
+ * @throws AMQException
+ */
+ public synchronized void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName,
+ StoreContext storeContext) throws AMQException
+ {
+ AMQQueue anotherQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
+ List<AMQMessage> list = getMessagesOnTheQueue();
+ List<AMQMessage> foundMessagesList = new ArrayList<AMQMessage>();
+ int maxMessageCountToBeMoved = (int)(toMessageId - fromMessageId + 1);
+ for (AMQMessage message : list)
+ {
+ long msgId = message.getMessageId();
+ if (msgId >= fromMessageId && msgId <= toMessageId)
+ {
+ foundMessagesList.add(message);
+ }
+ // break the loop as soon as messages to be removed are found
+ if (foundMessagesList.size() == maxMessageCountToBeMoved)
+ {
+ break;
+ }
+ }
+
+ // move messages to another queue
+ for (AMQMessage message : foundMessagesList)
+ {
+ try
+ {
+ anotherQueue.process(storeContext, message);
+ }
+ catch(AMQException ex)
+ {
+ foundMessagesList.subList(foundMessagesList.indexOf(message), foundMessagesList.size()).clear();
+ // Exception occured, so rollback the changes
+ anotherQueue.removeMessages(foundMessagesList);
+ throw ex;
+ }
+ }
+
+ // moving is successful, now remove from original queue
+ removeMessages(foundMessagesList);
+ }
+
+ public synchronized void removeMessages(List<AMQMessage> messageList)
+ {
+ _deliveryMgr.removeMessages(messageList);
+ }
+
+ /**
* @return MBean object associated with this Queue
*/
public ManagedObject getManagedObject()
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
index a263350cb0..ba0d3b86d2 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
@@ -381,6 +381,31 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
return _messageList;
}
+
+ /**
+ * @see ManagedQueue#moveMessages
+ * @param fromMessageId
+ * @param toMessageId
+ * @param toQueueName
+ * @throws JMException
+ */
+ public void moveMessages(long fromMessageId, long toMessageId, String toQueueName) throws JMException
+ {
+ if (fromMessageId > toMessageId || (fromMessageId < 1))
+ {
+ throw new OperationsException("\"From MessageId\" should be greater then 0 and less then \"To MessageId\"");
+ }
+
+ try
+ {
+ _queue.moveMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, _storeContext);
+ }
+ catch(AMQException amqex)
+ {
+ throw new JMException("Error moving messages to " + toQueueName + ": " + amqex);
+ }
+
+ }
//
// public ObjectName getObjectName() throws MalformedObjectNameException
// {
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index 3a9ce64c57..6c89101043 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -242,6 +242,16 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
}
+ public synchronized void removeMessages(List<AMQMessage> messageList)
+ {
+ for (AMQMessage msg : messageList)
+ {
+ if (_messages.remove(msg))
+ {
+ _totalMessageSize.getAndAdd(-msg.getSize());
+ }
+ }
+ }
public synchronized void removeAMessageFromTop(StoreContext storeContext) throws AMQException
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
index f7820e1465..c6f00bd189 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
@@ -76,6 +76,8 @@ interface DeliveryManager
long clearAllMessages(StoreContext storeContext) throws AMQException;
+ void removeMessages(List<AMQMessage> messageListToRemove);
+
List<AMQMessage> getMessages();
void populatePreDeliveryQueue(Subscription subscription);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java
index 81580d8db5..67bc830cf6 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java
@@ -225,4 +225,20 @@ public interface ManagedQueue
impact= MBeanOperationInfo.ACTION)
void clearQueue() throws IOException, JMException;
+ /**
+ * Moves the messages in given range of message Ids to given Queue. QPID-170
+ * @param fromMessageId first in the range of message ids
+ * @param toMessageId last in the range of message ids
+ * @param toQueue where the messages are to be moved
+ * @throws IOException
+ * @throws JMException
+ * @throws AMQException
+ */
+ @MBeanOperation(name="moveMessages",
+ description="You can move messages to another queue from this queue ",
+ impact= MBeanOperationInfo.ACTION)
+ void moveMessages(@MBeanOperationParameter(name="from MessageId", description="from MessageId")long fromMessageId,
+ @MBeanOperationParameter(name="to MessageId", description="to MessageId")long toMessageId,
+ @MBeanOperationParameter(name= ManagedQueue.TYPE, description="to Queue Name")String toQueue)
+ throws IOException, JMException, AMQException;
}
diff --git a/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedQueueAtomicSize.java b/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedQueueAtomicSize.java
index c2d758611d..c4d7683a02 100644
--- a/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedQueueAtomicSize.java
+++ b/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedQueueAtomicSize.java
@@ -55,4 +55,16 @@ public class ConcurrentLinkedQueueAtomicSize<E> extends ConcurrentLinkedQueue<E>
return e;
}
+
+ @Override
+ public boolean remove(Object o)
+ {
+ if (super.remove(o))
+ {
+ _size.decrementAndGet();
+ return true;
+ }
+
+ return false;
+ }
}
diff --git a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/Constants.java b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/Constants.java
index 5c3b45a8f2..e3e22aa7ee 100644
--- a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/Constants.java
+++ b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/Constants.java
@@ -50,6 +50,7 @@ public class Constants
public final static String ATTRIBUTE_QUEUE_CONSUMERCOUNT = "ActiveConsumerCount";
public final static String OPERATION_CREATE_QUEUE = "createNewQueue";
public final static String OPERATION_CREATE_BINDING = "createNewBinding";
+ public final static String OPERATION_MOVE_MESSAGES = "moveMessages";
public final static String ALL = "All";
diff --git a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ServerRegistry.java b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ServerRegistry.java
index b498454fc1..32e2290c2a 100644
--- a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ServerRegistry.java
+++ b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ServerRegistry.java
@@ -128,7 +128,7 @@ public abstract class ServerRegistry
public abstract OperationDataModel getOperationModel(ManagedBean mbean);
- public abstract String[] getQueueNames(String vistualHostName);
+ public abstract List<String> getQueueNames(String vistualHostName);
public abstract String[] getExchangeNames(String vistualHostName);
diff --git a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/JMXServerRegistry.java b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/JMXServerRegistry.java
index 882fdfb038..c5988fd480 100644
--- a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/JMXServerRegistry.java
+++ b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/JMXServerRegistry.java
@@ -387,19 +387,18 @@ public class JMXServerRegistry extends ServerRegistry
return _operationModelMap.get(mbean.getUniqueName());
}
- public String[] getQueueNames(String virtualHostName)
+ public List<String> getQueueNames(String virtualHostName)
{
List<ManagedBean> list = getQueues(virtualHostName);
if (list == null)
return null;
- String[] queues = new String[list.size()];
- int i = 0;
+ List<String> queueNames = new ArrayList<String>();
for (ManagedBean mbean : list)
{
- queues[i++] = mbean.getName();
+ queueNames.add(mbean.getName());
}
- return queues;
+ return queueNames;
}
public String[] getExchangeNames(String virtualHostName)
diff --git a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/MBeanUtility.java b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/MBeanUtility.java
index 1a1de11e30..e7361ddec7 100644
--- a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/MBeanUtility.java
+++ b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/MBeanUtility.java
@@ -163,7 +163,7 @@ public class MBeanUtility
}
else if (ex instanceof MBeanException)
{
- String cause = ((MBeanException)ex).getTargetException().toString();
+ String cause = ((MBeanException)ex).getTargetException().getMessage();
if (cause == null)
cause = ex.toString();
ViewUtility.popupInfoMessage(mbean.getInstanceName(), cause);
@@ -178,7 +178,7 @@ public class MBeanUtility
}
else
{
- ViewUtility.popupErrorMessage(mbean.getInstanceName(), ex.toString());
+ ViewUtility.popupErrorMessage(mbean.getInstanceName(), ex.getMessage());
ex.printStackTrace();
}
diff --git a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/model/OperationDataModel.java b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/model/OperationDataModel.java
index 2df36ee8c6..96964a81ef 100644
--- a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/model/OperationDataModel.java
+++ b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/model/OperationDataModel.java
@@ -45,9 +45,8 @@ public class OperationDataModel
for (int i = 0; i < parametersCount; i++)
{
MBeanParameterInfo paramInfo = opInfo.getSignature()[i];
- ParameterData param = new ParameterData(paramInfo.getName());
- param.setDescription(paramInfo.getDescription());
- param.setType(paramInfo.getType());
+ ParameterData param = new ParameterData(paramInfo.getName(), paramInfo.getDescription(),
+ paramInfo.getType());
paramList.add(param);
}
opData.setParameters(paramList);
diff --git a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/model/ParameterData.java b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/model/ParameterData.java
index 9d1d44559e..d12217c6eb 100644
--- a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/model/ParameterData.java
+++ b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/model/ParameterData.java
@@ -31,19 +31,18 @@ public class ParameterData
private String _type;
private Object _value;
- ParameterData(String name)
+ ParameterData(String name, String desc, String type)
{
this._name = name;
+ this._description = desc;
+ this._type = type;
+ setDefaultValue();
}
public String getDescription()
{
return _description;
}
- public void setDescription(String description)
- {
- this._description = description;
- }
public String getName()
{
@@ -54,10 +53,6 @@ public class ParameterData
{
return _type;
}
- public void setType(String type)
- {
- this._type = type;
- }
public Object getValue()
{
diff --git a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/MBeanView.java b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/MBeanView.java
index e3cd92f38a..cdb4aa99a8 100644
--- a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/MBeanView.java
+++ b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/MBeanView.java
@@ -39,6 +39,7 @@ import org.eclipse.swt.layout.FormAttachment;
import org.eclipse.swt.layout.FormData;
import org.eclipse.swt.layout.FormLayout;
import org.eclipse.swt.widgets.Composite;
+import org.eclipse.swt.widgets.Control;
import org.eclipse.swt.widgets.Event;
import org.eclipse.swt.widgets.Listener;
import org.eclipse.swt.widgets.TabFolder;
@@ -184,6 +185,15 @@ public class MBeanView extends ViewPart
}
TabFolder tabFolder = tabFolderMap.get(_mbean.getType());
+ /*
+ * This solution can be used if there are many versions of Qpid running. Otherwise
+ * there is no need to create a tabFolder everytime a bean is selected.
+ if (tabFolder != null && !tabFolder.isDisposed())
+ {
+ tabFolder.dispose();
+ }
+ tabFolder = createTabFolder();
+ */
if (tabFolder == null)
{
tabFolder = createTabFolder();
diff --git a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/OperationTabControl.java b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/OperationTabControl.java
index c95c9e2bd1..7298e287ae 100644
--- a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/OperationTabControl.java
+++ b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/OperationTabControl.java
@@ -255,7 +255,14 @@ public class OperationTabControl extends TabControl
String[] items = null;
if (param.getName().equals(Constants.QUEUE))
{
- items = ApplicationRegistry.getServerRegistry(_mbean).getQueueNames(_virtualHostName);
+ List<String> qList = ApplicationRegistry.getServerRegistry(_mbean).getQueueNames(_virtualHostName);
+ // Customization for AMQQueueMBean method Constants.OPERATION_MOVE_MESSAGES
+ if (_opData.getName().equals(Constants.OPERATION_MOVE_MESSAGES))
+ {
+ qList.remove(_mbean.getName());
+ }
+ // End of Customization
+ items = qList.toArray(new String[0]);
}
else if (param.getName().equals(Constants.EXCHANGE))
{
@@ -269,8 +276,14 @@ public class OperationTabControl extends TabControl
if (items != null)
{
org.eclipse.swt.widgets.List _list = new org.eclipse.swt.widgets.List(_paramsComposite, SWT.BORDER | SWT.V_SCROLL);
- int listSize = _form.getClientArea().height / 3;
+ int listSize = _form.getClientArea().height * 2 / 3;
int itemsHeight = items.length * (_list.getItemHeight() + 2);
+ // Set a min height for the list widget (set it to min 4 items)
+ if (items.length < 4)
+ {
+ itemsHeight = 4 * (_list.getItemHeight() + 2);
+ }
+
listSize = (listSize > itemsHeight) ? itemsHeight : listSize;
parameterPositionOffset = parameterPositionOffset + listSize;
formData.bottom = new FormAttachment(0, parameterPositionOffset);
@@ -296,7 +309,9 @@ public class OperationTabControl extends TabControl
formData.left = new FormAttachment(label, 5);
formData.right = new FormAttachment(valueWidth);
text.setLayoutData(formData);
+ // Listener to assign value to the parameter
text.addKeyListener(keyListener);
+ // Listener to verify if the entered key is valid
text.addVerifyListener(verifyListener);
text.setData(param);
}
@@ -358,9 +373,9 @@ public class OperationTabControl extends TabControl
formData.left = new FormAttachment(label, 5);
formData.right = new FormAttachment(valueWidth);
- Combo combo = new Combo(composite, SWT.READ_ONLY | SWT.DROP_DOWN);
- String[] items = ApplicationRegistry.getServerRegistry(_mbean).getQueueNames(_virtualHostName);
- combo.setItems(items);
+ Combo combo = new Combo(composite, SWT.READ_ONLY | SWT.DROP_DOWN);
+ List<String> qList = ApplicationRegistry.getServerRegistry(_mbean).getQueueNames(_virtualHostName);
+ combo.setItems(qList.toArray(new String[0]));
combo.add("Select Queue", 0);
combo.select(0);
combo.setLayoutData(formData);
@@ -513,6 +528,8 @@ public class OperationTabControl extends TabControl
{
if (controls[i] instanceof Combo)
((Combo)controls[i]).select(0);
+ if (controls[i] instanceof org.eclipse.swt.widgets.List)
+ ((org.eclipse.swt.widgets.List)controls[i]).deselectAll();
else if (controls[i] instanceof Text)
((Text)controls[i]).setText("");
else if (controls[i] instanceof Composite)
@@ -685,7 +702,15 @@ public class OperationTabControl extends TabControl
// Get the parameters widget and assign the text to the parameter
String strValue = text.getText();
ParameterData parameter = (ParameterData)text.getData();
- parameter.setValueFromString(strValue);
+ try
+ {
+ parameter.setValueFromString(strValue);
+ }
+ catch(Exception ex)
+ {
+ // Exception occured in setting parameter value.
+ // ignore it. The value will not be assigned to the parameter
+ }
}
}
@@ -727,12 +752,10 @@ public class OperationTabControl extends TabControl
{
public void verifyText(VerifyEvent event)
{
- Text text = (Text)event.widget;
- String string = event.text;
- char [] chars = new char [string.length ()];
- string.getChars (0, chars.length, chars, 0);
-
- ParameterData parameter = (ParameterData)text.getData();
+ ParameterData parameter = (ParameterData)event.widget.getData();
+ String text = event.text;
+ char [] chars = new char [text.length ()];
+ text.getChars(0, chars.length, chars, 0);
String type = parameter.getType();
if (type.equals("int") || type.equals("java.lang.Integer") ||
type.equals("long") || type.equals("java.lang.Long"))