summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-07-17 08:35:35 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-07-17 08:35:35 +0000
commitbc77d792e85c348163d24a609f05f3772a7ae950 (patch)
treede1542dfb4d0652df4082c6620c6ff6d3972fc7f
parent8a4f51a91860724aefe63049268567e99d822ddd (diff)
downloadqpid-python-bc77d792e85c348163d24a609f05f3772a7ae950.tar.gz
Update to the MessageStore Tool to provide Move and Purge functionality.
Updated to remove the AMQExceptions that will be removed from the Exchange class. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@556847 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Copy.java99
-rw-r--r--java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/List.java25
-rw-r--r--java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java166
-rw-r--r--java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Purge.java68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Select.java27
5 files changed, 247 insertions, 138 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Copy.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Copy.java
index 831cb90a4d..96ecb36952 100644
--- a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Copy.java
+++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Copy.java
@@ -20,20 +20,12 @@
*/
package org.apache.qpid.tools.messagestore.commands;
-import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.tools.messagestore.MessageStoreTool;
-public class Copy extends AbstractCommand
+public class Copy extends Move
{
-
- /**
- * Since the Coopy command is not associated with a real channel we can safely create our own store context
- * for use in the few methods that require one.
- */
- private StoreContext _storeContext = new StoreContext();
-
public Copy(MessageStoreTool tool)
{
super(tool);
@@ -56,94 +48,9 @@ public class Copy extends AbstractCommand
return "copy";
}
- public void execute(String... args)
+ protected void doCommand(AMQQueue fromQueue, long start, long end, AMQQueue toQueue, StoreContext storeContext)
{
- AMQQueue toQueue = null;
- AMQQueue fromQueue = _tool.getState().getQueue();
- java.util.List<Long> msgids = _tool.getState().getMessages();
-
- if (args.length >= 2)
- {
- for (String arg : args)
- {
- if (arg.startsWith("to="))
- {
- String queueName = arg.substring(arg.indexOf("=") + 1);
- toQueue = _tool.getState().getVhost().getQueueRegistry().getQueue(new AMQShortString(queueName));
- }
-
- if (arg.startsWith("from="))
- {
- String queueName = arg.substring(arg.indexOf("=") + 1);
- fromQueue = _tool.getState().getVhost().getQueueRegistry().getQueue(new AMQShortString(queueName));
- }
-
- if (arg.startsWith("msgids="))
- {
- String msgidStr = arg.substring(arg.indexOf("=") + 1);
-
- // Record the current message selection
- java.util.List<Long> currentIDs = _tool.getState().getMessages();
-
- // Use the ToolState class to perform the messasge parsing
- _tool.getState().setMessages(msgidStr);
- msgids = _tool.getState().getMessages();
-
- // Reset the original selection of messages
- _tool.getState().setMessages(currentIDs);
- }
- }
- }
-
- if (toQueue == null)
- {
- _console.println("Queue to copy to not specifed.");
- _console.println(usage());
- return;
- }
-
- if (fromQueue == null)
- {
- _console.println("Queue to copy from not specifed.");
- _console.println(usage());
- return;
- }
-
- performCopy(fromQueue, toQueue, msgids);
+ fromQueue.copyMessagesToAnotherQueue(start, end, toQueue.getName().toString(), storeContext);
}
- protected void performCopy(AMQQueue fromQueue, AMQQueue toQueue, java.util.List<Long> msgids)
- {
- Long previous = null;
- Long start = null;
-
- for (long id : msgids)
- {
- if (previous != null)
- {
- if (id == previous + 1)
- {
- if (start == null)
- {
- start = previous;
- }
- }
- else
- {
- if (start != null)
- {
- //move a range of ids
- fromQueue.moveMessagesToAnotherQueue(start, id, toQueue.getName().toString(), _storeContext);
- }
- else
- {
- //move a single id
- fromQueue.moveMessagesToAnotherQueue(id, id, toQueue.getName().toString(), _storeContext);
- }
- }
- }
-
- previous = id;
- }
- }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/List.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/List.java
index 8a42718542..df8b59ec19 100644
--- a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/List.java
+++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/List.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.tools.messagestore.commands;
-import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.queue.AMQQueue;
@@ -234,17 +233,9 @@ public class List extends AbstractCommand
{
if (exchange != null)
{
- try
+ if (exchange.isBound(queue))
{
- if (exchange.isBound(queue))
- {
- data.add(queue.toString());
- }
- }
- catch (AMQException e)
- {
- // is never thrown by current impls forced to throw by interface.
- commandError("Unable to check exchange bindings: " + e.getMessage(), null);
+ data.add(queue.toString());
}
}
else
@@ -299,17 +290,9 @@ public class List extends AbstractCommand
{
if (exchange != null)
{
- try
- {
- if (exchange.isBound(queue))
- {
- data.add(queue.getName().toString());
- }
- }
- catch (AMQException e)
+ if (exchange.isBound(queue))
{
- // is never thrown by current impls forced to throw by interface.
- commandError("Unable to check exchange bindings: " + e.getMessage(), null);
+ data.add(queue.getName().toString());
}
}
else
diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java
new file mode 100644
index 0000000000..a9497fd23e
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.tools.messagestore.commands;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.tools.messagestore.MessageStoreTool;
+
+import java.util.List;
+
+public class Move extends AbstractCommand
+{
+
+ /**
+ * Since the Coopy command is not associated with a real channel we can safely create our own store context
+ * for use in the few methods that require one.
+ */
+ private StoreContext _storeContext = new StoreContext();
+
+ public Move(MessageStoreTool tool)
+ {
+ super(tool);
+ }
+
+ public String help()
+ {
+ return "Move messages between queues.\n" +
+ "The currently selected message set will be moved to the specifed queue.\n" +
+ "Alternatively the values can be provided on the command line.";
+ }
+
+ public String usage()
+ {
+ return "move to=<queue> [from=<queue>] [msgids=<msgids eg, 1,2,4-10>]";
+ }
+
+ public String getCommand()
+ {
+ return "move";
+ }
+
+ public void execute(String... args)
+ {
+ AMQQueue toQueue = null;
+ AMQQueue fromQueue = _tool.getState().getQueue();
+ java.util.List<Long> msgids = _tool.getState().getMessages();
+
+ if (args.length >= 2)
+ {
+ for (String arg : args)
+ {
+ if (arg.startsWith("to="))
+ {
+ String queueName = arg.substring(arg.indexOf("=") + 1);
+ toQueue = _tool.getState().getVhost().getQueueRegistry().getQueue(new AMQShortString(queueName));
+ }
+
+ if (arg.startsWith("from="))
+ {
+ String queueName = arg.substring(arg.indexOf("=") + 1);
+ fromQueue = _tool.getState().getVhost().getQueueRegistry().getQueue(new AMQShortString(queueName));
+ }
+
+ if (arg.startsWith("msgids="))
+ {
+ String msgidStr = arg.substring(arg.indexOf("=") + 1);
+
+ // Record the current message selection
+ java.util.List<Long> currentIDs = _tool.getState().getMessages();
+
+ // Use the ToolState class to perform the messasge parsing
+ _tool.getState().setMessages(msgidStr);
+ msgids = _tool.getState().getMessages();
+
+ // Reset the original selection of messages
+ _tool.getState().setMessages(currentIDs);
+ }
+ }
+ }
+
+ if (!checkRequirements(fromQueue, toQueue, msgids))
+ {
+ return;
+ }
+
+ processIDs(fromQueue, toQueue, msgids);
+ }
+
+ private void processIDs(AMQQueue fromQueue, AMQQueue toQueue, java.util.List<Long> msgids)
+ {
+ Long previous = null;
+ Long start = null;
+
+ for (long id : msgids)
+ {
+ if (previous != null)
+ {
+ if (id == previous + 1)
+ {
+ if (start == null)
+ {
+ start = previous;
+ }
+ }
+ else
+ {
+ if (start != null)
+ {
+ //move a range of ids
+ doCommand(fromQueue, start, id, toQueue, _storeContext);
+ }
+ else
+ {
+ //move a single id
+ doCommand(fromQueue, id, id, toQueue, _storeContext);
+ }
+ }
+ }
+
+ previous = id;
+ }
+ }
+
+ protected boolean checkRequirements(AMQQueue fromQueue, AMQQueue toQueue, List<Long> msgids)
+ {
+ if (toQueue == null)
+ {
+ _console.println("Destination queue not specifed.");
+ _console.println(usage());
+ return false;
+ }
+
+ if (fromQueue == null)
+ {
+ _console.println("Source queue not specifed.");
+ _console.println(usage());
+ return false;
+ }
+
+ return true;
+ }
+
+ protected void doCommand(AMQQueue fromQueue, long start, long id, AMQQueue toQueue, StoreContext storeContext)
+ {
+ fromQueue.moveMessagesToAnotherQueue(start, id, toQueue.getName().toString(), _storeContext);
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Purge.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Purge.java
new file mode 100644
index 0000000000..7154159b40
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Purge.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.tools.messagestore.commands;
+
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.tools.messagestore.MessageStoreTool;
+
+public class Purge extends Move
+{
+ public Purge(MessageStoreTool tool)
+ {
+ super(tool);
+ }
+
+ public String help()
+ {
+ return "Purge messages from a queue.\n" +
+ "The currently selected message set will be purged from the specifed queue.\n" +
+ "Alternatively the values can be provided on the command line.";
+ }
+
+ public String usage()
+ {
+ return "purge from=<queue> [msgids=<msgids eg, 1,2,4-10>]";
+ }
+
+ public String getCommand()
+ {
+ return "purge";
+ }
+
+
+ protected boolean checkRequirements(AMQQueue fromQueue, AMQQueue toQueue, java.util.List<Long> msgids)
+ {
+ if (fromQueue == null)
+ {
+ _console.println("Source queue not specifed.");
+ _console.println(usage());
+ return false;
+ }
+
+ return true;
+ }
+
+ protected void doCommand(AMQQueue fromQueue, long start, long end, AMQQueue toQueue, StoreContext storeContext)
+ {
+ fromQueue.removeMessagesFromQueue(start, end, storeContext);
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Select.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Select.java
index 39f35da912..5e9b7028e9 100644
--- a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Select.java
+++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Select.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.tools.messagestore.commands;
-import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.queue.AMQQueue;
@@ -132,16 +131,9 @@ public class Select extends AbstractCommand
if (_tool.getState().getQueue() != null)
{
- try
+ if (!exchange.isBound(_tool.getState().getQueue()))
{
- if (!exchange.isBound(_tool.getState().getQueue()))
- {
- _tool.getState().setQueue(null);
- }
- }
- catch (AMQException e)
- {
- //ignore
+ _tool.getState().setQueue(null);
}
}
}
@@ -170,18 +162,11 @@ public class Select extends AbstractCommand
{
for (AMQShortString exchangeName : vhost.getExchangeRegistry().getExchangeNames())
{
- try
- {
- Exchange exchange = vhost.getExchangeRegistry().getExchange(exchangeName);
- if (exchange.isBound(queue))
- {
- _tool.getState().setExchange(exchange);
- break;
- }
- }
- catch (AMQException e)
+ Exchange exchange = vhost.getExchangeRegistry().getExchange(exchangeName);
+ if (exchange.isBound(queue))
{
- //ignore error
+ _tool.getState().setExchange(exchange);
+ break;
}
}
}